diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/workflow/AnchoredFeatureJoinStep.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/workflow/AnchoredFeatureJoinStep.scala index 7ed3295e0..2bc6a5980 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/workflow/AnchoredFeatureJoinStep.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/workflow/AnchoredFeatureJoinStep.scala @@ -65,26 +65,10 @@ private[offline] class AnchoredFeatureJoinStep( // We try to guess the column data type from the configured feature type. If feature type is not present, we will default to // default feathr behavior of returning a map column of string to float. val obsDfWithDefaultNullColumn = missingFeatures.keys.foldLeft(dataframe) { (observationDF, featureName) => - val featureColumnType = if (featureTypes.contains(featureName)) { - featureTypes(featureName).getFeatureType match { - case FeatureTypes.NUMERIC => FloatType - case FeatureTypes.BOOLEAN => BooleanType - case FeatureTypes.DENSE_VECTOR => ArrayType(FloatType) - case FeatureTypes.CATEGORICAL => StringType - case FeatureTypes.CATEGORICAL_SET => ArrayType(StringType) - case FeatureTypes.TERM_VECTOR => ArrayType(StructType(Seq(StructField("key", StringType), StructField("value", FloatType)))) - case FeatureTypes.TENSOR => ArrayType(StructType(Seq(StructField("key", StringType),StructField("value", FloatType)))) - case FeatureTypes.UNSPECIFIED => ArrayType(StructType(Seq(StructField("key", StringType),StructField("value", FloatType)))) - case _ => ArrayType(StructType(Seq(StructField("key", StringType),StructField("value", FloatType)))) - } - } else { // feature type is not configured - ArrayType(StructType(Seq(StructField("key", StringType),StructField("value", FloatType)))) - } - val tensorColumnSchema = ArrayType(StructType(Seq( - StructField("indices0", StringType), - StructField("value", FloatType) - ))) - observationDF.withColumn(DataFrameColName.genFeatureColumnName(FEATURE_NAME_PREFIX + featureName), lit(null).cast(featureColumnType)) + val withColumDf = observationDF.withColumn(DataFrameColName.genFeatureColumnName(FEATURE_NAME_PREFIX + featureName), lit(null)) + val featureTypeConfig = missingFeatures(featureName).featureAnchor.featureTypeConfigs.getOrElse(featureName, FeatureTypeConfig.UNDEFINED_TYPE_CONFIG) + FeaturizedDatasetUtils.convertRawDFtoQuinceFDS(withColumDf, Map(DataFrameColName.genFeatureColumnName(FEATURE_NAME_PREFIX + featureName) -> + (featureName, featureTypeConfig))) } val dataframeWithDefaults = substituteDefaults(obsDfWithDefaultNullColumn, missingFeatures.keys.toSeq, defaults, featureTypes, @@ -114,7 +98,7 @@ private[offline] class AnchoredFeatureJoinStep( val AnchorJoinStepInput(observationDF, anchorDFMap) = input val shouldAddDefault = FeathrUtils.getFeathrJobParam(ctx.sparkSession.sparkContext.getConf, FeathrUtils.ADD_DEFAULT_COL_FOR_MISSING_DATA).toBoolean - val withMissingFeaturesSubstituted = if (shouldAddDefault) { + val (withMissingFeaturesSubstituted, missingFeatures) = if (shouldAddDefault) { val missingFeatures = features.map(x => x.getFeatureName).filter(x => { val containsFeature: Seq[Boolean] = anchorDFMap.map(y => y._1.selectedFeatures.contains(x)).toSeq !containsFeature.contains(true) @@ -122,12 +106,13 @@ private[offline] class AnchoredFeatureJoinStep( log.warn(s"Missing data for features ${missingFeatures.mkString}. Default values will be populated for this column.") SuppressedExceptionHandlerUtils.missingFeatures ++= missingFeatures val missingAnchoredFeatures = ctx.featureGroups.allAnchoredFeatures.filter(featureName => missingFeatures.contains(featureName._1)) - substituteDefaultsForDataMissingFeatures(ctx.sparkSession, observationDF, ctx.logicalPlan, - missingAnchoredFeatures) - } else observationDF + (substituteDefaultsForDataMissingFeatures(ctx.sparkSession, observationDF, ctx.logicalPlan, + missingAnchoredFeatures), missingFeatures) + } else (observationDF, Seq()) val allAnchoredFeatures: Map[String, FeatureAnchorWithSource] = ctx.featureGroups.allAnchoredFeatures - val joinStages = ctx.logicalPlan.joinStages + val joinStages = ctx.logicalPlan.joinStages.map(keyValue => (keyValue._1, keyValue._2.filter(featuresAtThisStage => + !missingFeatures.contains(featuresAtThisStage)))) val joinOutput = joinStages .foldLeft(FeatureDataFrame(withMissingFeaturesSubstituted, Map.empty[String, FeatureTypeConfig]))((accFeatureDataFrame, joinStage) => { val (keyTags: Seq[Int], featureNames: Seq[String]) = joinStage diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowAggregationJoiner.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowAggregationJoiner.scala index f94e46585..3ff115b5f 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowAggregationJoiner.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/swa/SlidingWindowAggregationJoiner.scala @@ -281,14 +281,14 @@ private[offline] class SlidingWindowAggregationJoiner( val features = x._1.selectedFeatures emptyFeatures.contains(features.head) }).flatMap(s => s._1.featureAnchor.defaults) // populate the defaults for features whose data was missing - val userSpecifiedTypesConfig = windowAggAnchorDFThisStage.flatMap(_._1.featureAnchor.featureTypeConfigs) + val userSpecifiedTypesConfig = windowAggAnchorDFMap.flatMap(_._1.featureAnchor.featureTypeConfigs) // Create a map from the feature name to the column format, ie - RAW or FDS_TENSOR val featureNameToColumnFormat = allWindowAggFeatures.map (nameToFeatureAnchor => nameToFeatureAnchor._1 -> nameToFeatureAnchor._2.featureAnchor.extractor .asInstanceOf[TimeWindowConfigurableAnchorExtractor].features(nameToFeatureAnchor._1).columnFormat) - contextDF = emptyFeatures.foldLeft(contextDF) { (contextDF, featureName) => + contextDF = emptyFeatures.intersect(joinedFeatures.toSet).foldLeft(contextDF) { (contextDF, featureName) => contextDF.withColumn(featureName, lit(null)) } val FeatureDataFrame(withFDSFeatureDF, inferredTypes) = diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala index d8e4a733a..3f0331da2 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala @@ -37,7 +37,7 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH requiredFeatureAnchors: Seq[FeatureAnchorWithSource], failOnMissingPartition: Boolean): Map[FeatureAnchorWithSource, Option[DataSourceAccessor]] = { val shouldSkipFeature = FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.SKIP_MISSING_FEATURE).toBoolean -// val shouldAddDefaultCol = FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.ADD_DEFAULT_COL_FOR_MISSING_DATA).toBoolean + val shouldAddDefaultCol = FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.ADD_DEFAULT_COL_FOR_MISSING_DATA).toBoolean // get a Map from each source to a list of all anchors based on this source val sourceToAnchor = requiredFeatureAnchors @@ -76,7 +76,7 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH // If it is a nonTime based source, we will load the dataframe at runtime, however this is too late to decide if the // feature should be skipped. So, we will try to take the first row here, and see if it succeeds. - if (dataSource.isInstanceOf[NonTimeBasedDataSourceAccessor] && (shouldSkipFeature)) { + if (dataSource.isInstanceOf[NonTimeBasedDataSourceAccessor] && (shouldSkipFeature || shouldAddDefaultCol)) { if (dataSource.get().take(1).isEmpty) None else { Some(dataSource) } @@ -84,7 +84,7 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH Some(dataSource) } } catch { - case e: Exception => if (shouldSkipFeature) None else throw e + case e: Exception => if (shouldSkipFeature || shouldAddDefaultCol) None else throw e } anchorsWithDate.map(anchor => (anchor, timeSeriesSource)) }) diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala index 0b109717e..8fa7192f0 100644 --- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala +++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala @@ -137,10 +137,11 @@ class AnchoredFeaturesIntegTest extends FeathrIntegTest { | multiply_a_b: "toNumeric(aa) * toNumeric(bb)" | | categorical_b: { - | key: [foo] - | inputs: { foo_b: { key: foo, feature: bb } } - | definition: "toCategorical(foo_b)" - | } + | key: [foo] + | inputs: { foo_b: { key: foo, feature: bb } } + | definition: "toCategorical(foo_b)" + | } + | |} """.stripMargin.format(trainingData, featureData) @BeforeClass @@ -390,7 +391,7 @@ class AnchoredFeaturesIntegTest extends FeathrIntegTest { /* * Test skipping combination of anchored, derived and swa features. Also, test it with different default value types. */ - @Ignore + @Test def testAddDefaultForMissingAnchoredFeatures: Unit = { setFeathrJobParam(ADD_DEFAULT_COL_FOR_MISSING_DATA, "true") val df = runLocalFeatureJoinForTest( @@ -546,11 +547,14 @@ class AnchoredFeaturesIntegTest extends FeathrIntegTest { val featureList = df.data.collect().sortBy(row => if (row.get(0) != null) row.getAs[String]("a_id") else "null") assertEquals(featureList(0).getAs[Row]("aEmbedding"), Row(mutable.WrappedArray.make(Array("")), mutable.WrappedArray.make(Array(2.0f)))) - assertEquals(featureList(0).getAs[Row]("featureWithNull3"), "null") - assertEquals(featureList(0).getAs[Row]("featureWithNull5"), null) + assertEquals(featureList(0).getAs[Row]("featureWithNull3"), + Row(mutable.WrappedArray.make(Array("null")), mutable.WrappedArray.make(Array(1.0f)))) + assertEquals(featureList(0).getAs[Row]("featureWithNull5"), + Row(mutable.WrappedArray.make(Array("")), mutable.WrappedArray.make(Array(1.0f)))) assertEquals(featureList(0).getAs[Row]("featureWithNull7"), null) assertEquals(featureList(0).getAs[Row]("featureWithNull"),-1.0f) - assertEquals(featureList(0).getAs[Row]("featureWithNull4"), null) + assertEquals(featureList(0).getAs[Row]("featureWithNull4"), + Row(mutable.WrappedArray.make(Array("key")), mutable.WrappedArray.make(Array(1.0f)))) assertEquals(featureList(0).getAs[Row]("featureWithNull2"),1.0f) assertEquals(featureList(0).getAs[Row]("derived_featureWithNull"), Row(mutable.WrappedArray.make(Array("")), mutable.WrappedArray.make(Array(-2.0f)))) @@ -559,8 +563,7 @@ class AnchoredFeaturesIntegTest extends FeathrIntegTest { assertEquals(featureList(0).getAs[Row]("derived_featureWithNull2"), Row(mutable.WrappedArray.make(Array("")), mutable.WrappedArray.make(Array(2.0f)))) assertEquals(featureList(0).getAs[Row]("featureWithNullSql"), 1.0f) - assertEquals(featureList(0).getAs[Row]("seqJoin_featureWithNull"), - Row(mutable.WrappedArray.make(Array("")), mutable.WrappedArray.make(Array(1.0f)))) + assertEquals(featureList(0).getAs[Row]("seqJoin_featureWithNull"), -1.0f) assertEquals(SuppressedExceptionHandlerUtils.missingFeatures, Set("featureWithNull", "featureWithNull3", "featureWithNull5", "featureWithNull4", "featureWithNull7", "aEmbedding", "featureWithNull6", "derived_featureWithNull", "seqJoin_featureWithNull", "derived_featureWithNull7")) diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala index 153fe3cf5..7485b2af4 100644 --- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala +++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala @@ -1055,7 +1055,7 @@ class SlidingWindowAggIntegTest extends FeathrIntegTest { res.show() val df = res.collect()(0) assertEquals(df.getAs[Float]("simplePageViewCount"), 10f) - assertEquals(df.getAs[Float]("simpleFeature"), Row(mutable.WrappedArray.make(Array("")), mutable.WrappedArray.make(Array(20.0f)))) + assertEquals(df.getAs[Float]("simpleFeature"), 20.0f) setFeathrJobParam(FeathrUtils.ADD_DEFAULT_COL_FOR_MISSING_DATA, "false") } diff --git a/gradle.properties b/gradle.properties index 8d7091664..6c579938f 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,3 @@ -version=1.0.4-rc12 +version=1.0.5-rc1 SONATYPE_AUTOMATIC_RELEASE=true POM_ARTIFACT_ID=feathr_2.12