Skip to content

Commit

Permalink
Fix column types when default column is being patched for missing dat…
Browse files Browse the repository at this point in the history
…a features (#1188)

* Attempt to fix col type issues

* Add logs and version bump

* Attempt to fix missing features data type

* with SWA working

* revert change in /FeatureValueToColumnConverter.scala

* Address comments

---------

Co-authored-by: Rakesh Kashyap Hanasoge Padmanabha <rkashyap@rkashyap-mn3.linkedin.biz>
  • Loading branch information
rakeshkashyap123 and Rakesh Kashyap Hanasoge Padmanabha committed Jun 5, 2023
1 parent 05f5013 commit cb6032d
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,26 +65,10 @@ private[offline] class AnchoredFeatureJoinStep(
// We try to guess the column data type from the configured feature type. If feature type is not present, we will default to
// default feathr behavior of returning a map column of string to float.
val obsDfWithDefaultNullColumn = missingFeatures.keys.foldLeft(dataframe) { (observationDF, featureName) =>
val featureColumnType = if (featureTypes.contains(featureName)) {
featureTypes(featureName).getFeatureType match {
case FeatureTypes.NUMERIC => FloatType
case FeatureTypes.BOOLEAN => BooleanType
case FeatureTypes.DENSE_VECTOR => ArrayType(FloatType)
case FeatureTypes.CATEGORICAL => StringType
case FeatureTypes.CATEGORICAL_SET => ArrayType(StringType)
case FeatureTypes.TERM_VECTOR => ArrayType(StructType(Seq(StructField("key", StringType), StructField("value", FloatType))))
case FeatureTypes.TENSOR => ArrayType(StructType(Seq(StructField("key", StringType),StructField("value", FloatType))))
case FeatureTypes.UNSPECIFIED => ArrayType(StructType(Seq(StructField("key", StringType),StructField("value", FloatType))))
case _ => ArrayType(StructType(Seq(StructField("key", StringType),StructField("value", FloatType))))
}
} else { // feature type is not configured
ArrayType(StructType(Seq(StructField("key", StringType),StructField("value", FloatType))))
}
val tensorColumnSchema = ArrayType(StructType(Seq(
StructField("indices0", StringType),
StructField("value", FloatType)
)))
observationDF.withColumn(DataFrameColName.genFeatureColumnName(FEATURE_NAME_PREFIX + featureName), lit(null).cast(featureColumnType))
val withColumDf = observationDF.withColumn(DataFrameColName.genFeatureColumnName(FEATURE_NAME_PREFIX + featureName), lit(null))
val featureTypeConfig = missingFeatures(featureName).featureAnchor.featureTypeConfigs.getOrElse(featureName, FeatureTypeConfig.UNDEFINED_TYPE_CONFIG)
FeaturizedDatasetUtils.convertRawDFtoQuinceFDS(withColumDf, Map(DataFrameColName.genFeatureColumnName(FEATURE_NAME_PREFIX + featureName) ->
(featureName, featureTypeConfig)))
}

val dataframeWithDefaults = substituteDefaults(obsDfWithDefaultNullColumn, missingFeatures.keys.toSeq, defaults, featureTypes,
Expand Down Expand Up @@ -114,20 +98,21 @@ private[offline] class AnchoredFeatureJoinStep(
val AnchorJoinStepInput(observationDF, anchorDFMap) = input
val shouldAddDefault = FeathrUtils.getFeathrJobParam(ctx.sparkSession.sparkContext.getConf,
FeathrUtils.ADD_DEFAULT_COL_FOR_MISSING_DATA).toBoolean
val withMissingFeaturesSubstituted = if (shouldAddDefault) {
val (withMissingFeaturesSubstituted, missingFeatures) = if (shouldAddDefault) {
val missingFeatures = features.map(x => x.getFeatureName).filter(x => {
val containsFeature: Seq[Boolean] = anchorDFMap.map(y => y._1.selectedFeatures.contains(x)).toSeq
!containsFeature.contains(true)
})
log.warn(s"Missing data for features ${missingFeatures.mkString}. Default values will be populated for this column.")
SuppressedExceptionHandlerUtils.missingFeatures ++= missingFeatures
val missingAnchoredFeatures = ctx.featureGroups.allAnchoredFeatures.filter(featureName => missingFeatures.contains(featureName._1))
substituteDefaultsForDataMissingFeatures(ctx.sparkSession, observationDF, ctx.logicalPlan,
missingAnchoredFeatures)
} else observationDF
(substituteDefaultsForDataMissingFeatures(ctx.sparkSession, observationDF, ctx.logicalPlan,
missingAnchoredFeatures), missingFeatures)
} else (observationDF, Seq())

val allAnchoredFeatures: Map[String, FeatureAnchorWithSource] = ctx.featureGroups.allAnchoredFeatures
val joinStages = ctx.logicalPlan.joinStages
val joinStages = ctx.logicalPlan.joinStages.map(keyValue => (keyValue._1, keyValue._2.filter(featuresAtThisStage =>
!missingFeatures.contains(featuresAtThisStage))))
val joinOutput = joinStages
.foldLeft(FeatureDataFrame(withMissingFeaturesSubstituted, Map.empty[String, FeatureTypeConfig]))((accFeatureDataFrame, joinStage) => {
val (keyTags: Seq[Int], featureNames: Seq[String]) = joinStage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,14 +281,14 @@ private[offline] class SlidingWindowAggregationJoiner(
val features = x._1.selectedFeatures
emptyFeatures.contains(features.head)
}).flatMap(s => s._1.featureAnchor.defaults) // populate the defaults for features whose data was missing
val userSpecifiedTypesConfig = windowAggAnchorDFThisStage.flatMap(_._1.featureAnchor.featureTypeConfigs)
val userSpecifiedTypesConfig = windowAggAnchorDFMap.flatMap(_._1.featureAnchor.featureTypeConfigs)

// Create a map from the feature name to the column format, ie - RAW or FDS_TENSOR
val featureNameToColumnFormat = allWindowAggFeatures.map (nameToFeatureAnchor => nameToFeatureAnchor._1 ->
nameToFeatureAnchor._2.featureAnchor.extractor
.asInstanceOf[TimeWindowConfigurableAnchorExtractor].features(nameToFeatureAnchor._1).columnFormat)

contextDF = emptyFeatures.foldLeft(contextDF) { (contextDF, featureName) =>
contextDF = emptyFeatures.intersect(joinedFeatures.toSet).foldLeft(contextDF) { (contextDF, featureName) =>
contextDF.withColumn(featureName, lit(null))
}
val FeatureDataFrame(withFDSFeatureDF, inferredTypes) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH
requiredFeatureAnchors: Seq[FeatureAnchorWithSource],
failOnMissingPartition: Boolean): Map[FeatureAnchorWithSource, Option[DataSourceAccessor]] = {
val shouldSkipFeature = FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.SKIP_MISSING_FEATURE).toBoolean
// val shouldAddDefaultCol = FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.ADD_DEFAULT_COL_FOR_MISSING_DATA).toBoolean
val shouldAddDefaultCol = FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.ADD_DEFAULT_COL_FOR_MISSING_DATA).toBoolean

// get a Map from each source to a list of all anchors based on this source
val sourceToAnchor = requiredFeatureAnchors
Expand Down Expand Up @@ -76,15 +76,15 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH

// If it is a nonTime based source, we will load the dataframe at runtime, however this is too late to decide if the
// feature should be skipped. So, we will try to take the first row here, and see if it succeeds.
if (dataSource.isInstanceOf[NonTimeBasedDataSourceAccessor] && (shouldSkipFeature)) {
if (dataSource.isInstanceOf[NonTimeBasedDataSourceAccessor] && (shouldSkipFeature || shouldAddDefaultCol)) {
if (dataSource.get().take(1).isEmpty) None else {
Some(dataSource)
}
} else {
Some(dataSource)
}
} catch {
case e: Exception => if (shouldSkipFeature) None else throw e
case e: Exception => if (shouldSkipFeature || shouldAddDefaultCol) None else throw e
}
anchorsWithDate.map(anchor => (anchor, timeSeriesSource))
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,11 @@ class AnchoredFeaturesIntegTest extends FeathrIntegTest {
| multiply_a_b: "toNumeric(aa) * toNumeric(bb)"
|
| categorical_b: {
| key: [foo]
| inputs: { foo_b: { key: foo, feature: bb } }
| definition: "toCategorical(foo_b)"
| }
| key: [foo]
| inputs: { foo_b: { key: foo, feature: bb } }
| definition: "toCategorical(foo_b)"
| }
|
|}
""".stripMargin.format(trainingData, featureData)
@BeforeClass
Expand Down Expand Up @@ -390,7 +391,7 @@ class AnchoredFeaturesIntegTest extends FeathrIntegTest {
/*
* Test skipping combination of anchored, derived and swa features. Also, test it with different default value types.
*/
@Ignore
@Test
def testAddDefaultForMissingAnchoredFeatures: Unit = {
setFeathrJobParam(ADD_DEFAULT_COL_FOR_MISSING_DATA, "true")
val df = runLocalFeatureJoinForTest(
Expand Down Expand Up @@ -546,11 +547,14 @@ class AnchoredFeaturesIntegTest extends FeathrIntegTest {
val featureList = df.data.collect().sortBy(row => if (row.get(0) != null) row.getAs[String]("a_id") else "null")
assertEquals(featureList(0).getAs[Row]("aEmbedding"),
Row(mutable.WrappedArray.make(Array("")), mutable.WrappedArray.make(Array(2.0f))))
assertEquals(featureList(0).getAs[Row]("featureWithNull3"), "null")
assertEquals(featureList(0).getAs[Row]("featureWithNull5"), null)
assertEquals(featureList(0).getAs[Row]("featureWithNull3"),
Row(mutable.WrappedArray.make(Array("null")), mutable.WrappedArray.make(Array(1.0f))))
assertEquals(featureList(0).getAs[Row]("featureWithNull5"),
Row(mutable.WrappedArray.make(Array("")), mutable.WrappedArray.make(Array(1.0f))))
assertEquals(featureList(0).getAs[Row]("featureWithNull7"), null)
assertEquals(featureList(0).getAs[Row]("featureWithNull"),-1.0f)
assertEquals(featureList(0).getAs[Row]("featureWithNull4"), null)
assertEquals(featureList(0).getAs[Row]("featureWithNull4"),
Row(mutable.WrappedArray.make(Array("key")), mutable.WrappedArray.make(Array(1.0f))))
assertEquals(featureList(0).getAs[Row]("featureWithNull2"),1.0f)
assertEquals(featureList(0).getAs[Row]("derived_featureWithNull"),
Row(mutable.WrappedArray.make(Array("")), mutable.WrappedArray.make(Array(-2.0f))))
Expand All @@ -559,8 +563,7 @@ class AnchoredFeaturesIntegTest extends FeathrIntegTest {
assertEquals(featureList(0).getAs[Row]("derived_featureWithNull2"),
Row(mutable.WrappedArray.make(Array("")), mutable.WrappedArray.make(Array(2.0f))))
assertEquals(featureList(0).getAs[Row]("featureWithNullSql"), 1.0f)
assertEquals(featureList(0).getAs[Row]("seqJoin_featureWithNull"),
Row(mutable.WrappedArray.make(Array("")), mutable.WrappedArray.make(Array(1.0f))))
assertEquals(featureList(0).getAs[Row]("seqJoin_featureWithNull"), -1.0f)
assertEquals(SuppressedExceptionHandlerUtils.missingFeatures,
Set("featureWithNull", "featureWithNull3", "featureWithNull5", "featureWithNull4", "featureWithNull7",
"aEmbedding", "featureWithNull6", "derived_featureWithNull", "seqJoin_featureWithNull", "derived_featureWithNull7"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1055,7 +1055,7 @@ class SlidingWindowAggIntegTest extends FeathrIntegTest {
res.show()
val df = res.collect()(0)
assertEquals(df.getAs[Float]("simplePageViewCount"), 10f)
assertEquals(df.getAs[Float]("simpleFeature"), Row(mutable.WrappedArray.make(Array("")), mutable.WrappedArray.make(Array(20.0f))))
assertEquals(df.getAs[Float]("simpleFeature"), 20.0f)
setFeathrJobParam(FeathrUtils.ADD_DEFAULT_COL_FOR_MISSING_DATA, "false")
}

Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
version=1.0.4-rc12
version=1.0.5-rc1
SONATYPE_AUTOMATIC_RELEASE=true
POM_ARTIFACT_ID=feathr_2.12

0 comments on commit cb6032d

Please sign in to comment.