diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/client/FeathrClient.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/client/FeathrClient.scala index 54d31721a..d7cf748fe 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/client/FeathrClient.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/client/FeathrClient.scala @@ -1,7 +1,7 @@ package com.linkedin.feathr.offline.client import com.linkedin.feathr.common.exception._ -import com.linkedin.feathr.common.{FeatureInfo, Header, InternalApi, JoiningFeatureParams, RichConfig, TaggedFeatureName} +import com.linkedin.feathr.common.{FeatureInfo, FeatureTypeConfig, Header, InternalApi, JoiningFeatureParams, RichConfig, TaggedFeatureName} import com.linkedin.feathr.offline.config.sources.FeatureGroupsUpdater import com.linkedin.feathr.offline.config.{FeathrConfig, FeathrConfigLoader, FeatureGroupsGenerator, FeatureJoinConfig} import com.linkedin.feathr.offline.generation.{DataFrameFeatureGenerator, FeatureGenKeyTagAnalyzer, StreamingFeatureGenerator} @@ -12,8 +12,10 @@ import com.linkedin.feathr.offline.mvel.plugins.FeathrExpressionExecutionContext import com.linkedin.feathr.offline.source.DataSource import com.linkedin.feathr.offline.source.accessor.DataPathHandler import com.linkedin.feathr.offline.swa.SWAHandler +import com.linkedin.feathr.offline.transformation.DataFrameDefaultValueSubstituter.substituteDefaults import com.linkedin.feathr.offline.util._ import org.apache.logging.log4j.LogManager +import org.apache.spark.sql.functions.lit import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.{DataFrame, SparkSession} @@ -310,6 +312,8 @@ class FeathrClient private[offline] (sparkSession: SparkSession, featureGroups: var logicalPlan = logicalPlanner.getLogicalPlan(updatedFeatureGroups, keyTaggedFeatures) val shouldSkipFeature = FeathrUtils.getFeathrJobParam(sparkSession.sparkContext.getConf, FeathrUtils.SKIP_MISSING_FEATURE).toBoolean + val shouldAddDefault = FeathrUtils.getFeathrJobParam(sparkSession.sparkContext.getConf, FeathrUtils.ADD_DEFAULT_COL_FOR_MISSING_DATA).toBoolean + var leftRenamed = left val featureToPathsMap = (for { requiredFeature <- logicalPlan.allRequiredFeatures featureAnchorWithSource <- allAnchoredFeatures.get(requiredFeature.getFeatureName) @@ -323,6 +327,8 @@ class FeathrClient private[offline] (sparkSession: SparkSession, featureGroups: val featureGroupsWithoutInvalidFeatures = FeatureGroupsUpdater() .getUpdatedFeatureGroupsWithoutInvalidPaths(featureToPathsMap, updatedFeatureGroups, featurePathsTest._2) logicalPlanner.getLogicalPlan(featureGroupsWithoutInvalidFeatures, keyTaggedFeatures) + } else if (shouldAddDefault) { + // dont throw error if this flag is set, the missing data will be handled at a later step. } else { throw new FeathrInputDataException( ErrorLabel.FEATHR_USER_ERROR, @@ -358,7 +364,6 @@ class FeathrClient private[offline] (sparkSession: SparkSession, featureGroups: val renameFeatures = conflictsAutoCorrectionSetting.get.renameFeatureList val suffix = conflictsAutoCorrectionSetting.get.suffix log.warn(s"Found conflicted field names: ${conflictFeatureNames}. Will auto correct them by applying suffix: ${suffix}") - var leftRenamed = left conflictFeatureNames.foreach(name => { leftRenamed = leftRenamed.withColumnRenamed(name, name+'_'+suffix) }) @@ -369,7 +374,8 @@ class FeathrClient private[offline] (sparkSession: SparkSession, featureGroups: s"Failed to apply auto correction to solve conflicts. Still got conflicts after applying provided suffix ${suffix} for fields: " + s"${conflictFeatureNames}. Please provide another suffix or solve conflicts manually.") } - val (df, header) = joiner.joinFeaturesAsDF(sparkSession, joinConfig, updatedFeatureGroups, keyTaggedFeatures, leftRenamed, rowBloomFilterThreshold) + val (df, header) = joiner.joinFeaturesAsDF(sparkSession, joinConfig, updatedFeatureGroups, keyTaggedFeatures, leftRenamed, + rowBloomFilterThreshold) if(renameFeatures) { log.warn(s"Suffix :${suffix} is applied into feature names: ${conflictFeatureNames} to avoid conflicts in outputs") renameFeatureNames(df, header, conflictFeatureNames, suffix) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/workflow/AnchoredFeatureJoinStep.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/workflow/AnchoredFeatureJoinStep.scala index 578de839d..904681fe8 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/workflow/AnchoredFeatureJoinStep.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/join/workflow/AnchoredFeatureJoinStep.scala @@ -1,7 +1,7 @@ package com.linkedin.feathr.offline.join.workflow import com.linkedin.feathr.common.exception.{ErrorLabel, FeathrFeatureJoinException} -import com.linkedin.feathr.common.{ErasedEntityTaggedFeature, FeatureTypeConfig} +import com.linkedin.feathr.common.{ErasedEntityTaggedFeature, FeatureTypeConfig, FeatureTypes} import com.linkedin.feathr.offline import com.linkedin.feathr.offline.FeatureDataFrame import com.linkedin.feathr.offline.anchored.feature.FeatureAnchorWithSource @@ -12,15 +12,16 @@ import com.linkedin.feathr.offline.job.KeyedTransformedResult import com.linkedin.feathr.offline.join._ import com.linkedin.feathr.offline.join.algorithms._ import com.linkedin.feathr.offline.join.util.FrequentItemEstimatorFactory +import com.linkedin.feathr.offline.logical.{LogicalPlan, MultiStageJoinPlan} import com.linkedin.feathr.offline.mvel.plugins.FeathrExpressionExecutionContext import com.linkedin.feathr.offline.source.accessor.DataSourceAccessor import com.linkedin.feathr.offline.transformation.DataFrameDefaultValueSubstituter.substituteDefaults import com.linkedin.feathr.offline.transformation.DataFrameExt._ -import com.linkedin.feathr.offline.util.{DataFrameUtils, FeathrUtils} +import com.linkedin.feathr.offline.util.{DataFrameUtils, FeathrUtils, FeaturizedDatasetUtils} import com.linkedin.feathr.offline.util.FeathrUtils.shouldCheckPoint import org.apache.logging.log4j.LogManager import org.apache.spark.sql.{DataFrame, SparkSession} -import org.apache.spark.sql.functions.lit +import org.apache.spark.sql.functions.{col, lit} /** * An abstract class provides default implementation of anchored feature join step @@ -39,8 +40,64 @@ private[offline] class AnchoredFeatureJoinStep( extends FeatureJoinStep[AnchorJoinStepInput, DataFrameJoinStepOutput] { @transient lazy val log = LogManager.getLogger(getClass.getName) + /** + * When the add.default.col.for.missing.data flag is turned, some features could be skipped because of missing data. + * For such anchored features, we will add a feature column with a configured default value (if present in the feature anchor) or + * a null value column. + * @param sparkSession spark session + * @param dataframe the original observation dataframe + * @param logicalPlan logical plan generated using the join config + * @param missingFeatures Map of missing feature names to the corresponding featureAnchorWithSource object. + * @return Dataframe with the missing feature columns added + */ + def substituteDefaultsForDataMissingFeatures(sparkSession: SparkSession, dataframe: DataFrame, logicalPlan: MultiStageJoinPlan, + missingFeatures: Map[String, FeatureAnchorWithSource]): DataFrame = { + // Create a map of feature name to corresponding defaults. If a feature does not have default value, it would be missing + // from this map and we would add a default column of nulls for those features. + val defaults = missingFeatures.flatMap(s => s._2.featureAnchor.defaults) + + // Create a map of feature to their feature type if configured. + val featureTypes = missingFeatures + .map(x => Some(x._2.featureAnchor.featureTypeConfigs)) + .foldLeft(Map.empty[String, FeatureTypeConfig])((a, b) => a ++ b.getOrElse(Map.empty[String, FeatureTypeConfig])) + + // We try to guess the column data type from the configured feature type. If feature type is not present, we will default to + // default feathr behavior of returning a map column of string to float. + val obsDfWithDefaultNullColumn = missingFeatures.keys.foldLeft(dataframe) { (observationDF, featureName) => + val featureColumnType = if (featureTypes.contains(featureName)) { + featureTypes(featureName).getFeatureType match { + case FeatureTypes.NUMERIC => "float" + case FeatureTypes.BOOLEAN => "boolean" + case FeatureTypes.DENSE_VECTOR => "array" + case FeatureTypes.CATEGORICAL => "string" + case FeatureTypes.CATEGORICAL_SET => "array" + case FeatureTypes.TERM_VECTOR => "map" + case FeatureTypes.UNSPECIFIED => "map" + case _ => "map" + } + } else { // feature type is not configured + "map" + } + observationDF.withColumn(DataFrameColName.genFeatureColumnName(FEATURE_NAME_PREFIX + featureName), lit(null).cast(featureColumnType)) + } + + val dataframeWithDefaults = substituteDefaults(obsDfWithDefaultNullColumn, missingFeatures.keys.toSeq, defaults, featureTypes, + sparkSession, (s: String) => s"${FEATURE_NAME_PREFIX}$s") + + // We want to duplicate this column with the correct feathr supported feature name which is required for further processing. + // For example, if feature name is abc and the corresponding key is x, the column name would be __feathr_feature_abc_x. + // This column will be dropped after all the joins are done. + missingFeatures.keys.foldLeft(dataframeWithDefaults) { (dataframeWithDefaults, featureName) => + val keyTags = logicalPlan.joinStages.filter(kv => kv._2.contains(featureName)).head._1 + val keyStr = keyTags.map(logicalPlan.keyTagIntsToStrings).toList + dataframeWithDefaults.withColumn(DataFrameColName.genFeatureColumnName(FEATURE_NAME_PREFIX + featureName, Some(keyStr)), + col(DataFrameColName.genFeatureColumnName(FEATURE_NAME_PREFIX + featureName))) + } + } + /** * Join anchored features to the observation passed as part of the input context. + * * @param features Non-window aggregation, basic anchored features. * @param input input context for this step. * @param ctx environment variable that contains join job execution context. @@ -49,10 +106,22 @@ private[offline] class AnchoredFeatureJoinStep( override def joinFeatures(features: Seq[ErasedEntityTaggedFeature], input: AnchorJoinStepInput)( implicit ctx: JoinExecutionContext): FeatureDataFrameOutput = { val AnchorJoinStepInput(observationDF, anchorDFMap) = input + val shouldAddDefault = FeathrUtils.getFeathrJobParam(ctx.sparkSession.sparkContext.getConf, + FeathrUtils.ADD_DEFAULT_COL_FOR_MISSING_DATA).toBoolean + val withMissingFeaturesSubstituted = if (shouldAddDefault) { + val missingFeatures = features.map(x => x.getFeatureName).filter(x => { + val containsFeature: Seq[Boolean] = anchorDFMap.map(y => y._1.selectedFeatures.contains(x)).toSeq + containsFeature.contains(false) + }) + val missingAnchoredFeatures = ctx.featureGroups.allAnchoredFeatures.filter(featureName => missingFeatures.contains(featureName._1)) + substituteDefaultsForDataMissingFeatures(ctx.sparkSession, observationDF, ctx.logicalPlan, + missingAnchoredFeatures) + }else observationDF + val allAnchoredFeatures: Map[String, FeatureAnchorWithSource] = ctx.featureGroups.allAnchoredFeatures val joinStages = ctx.logicalPlan.joinStages val joinOutput = joinStages - .foldLeft(FeatureDataFrame(observationDF, Map.empty[String, FeatureTypeConfig]))((accFeatureDataFrame, joinStage) => { + .foldLeft(FeatureDataFrame(withMissingFeaturesSubstituted, Map.empty[String, FeatureTypeConfig]))((accFeatureDataFrame, joinStage) => { val (keyTags: Seq[Int], featureNames: Seq[String]) = joinStage val FeatureDataFrame(contextDF, inferredFeatureTypeMap) = accFeatureDataFrame // map feature name to its transformed dataframe and the join key of the dataframe diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala index 5aeeeac0a..bae2c64ef 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/transformation/AnchorToDataSourceMapper.scala @@ -37,6 +37,8 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH requiredFeatureAnchors: Seq[FeatureAnchorWithSource], failOnMissingPartition: Boolean): Map[FeatureAnchorWithSource, Option[DataSourceAccessor]] = { val shouldSkipFeature = FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.SKIP_MISSING_FEATURE).toBoolean + val shouldAddDefaultCol = FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.ADD_DEFAULT_COL_FOR_MISSING_DATA).toBoolean + // get a Map from each source to a list of all anchors based on this source val sourceToAnchor = requiredFeatureAnchors .map(anchor => (anchor.source, anchor)) @@ -74,7 +76,7 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH // If it is a nonTime based source, we will load the dataframe at runtime, however this is too late to decide if the // feature should be skipped. So, we will try to take the first row here, and see if it succeeds. - if (dataSource.isInstanceOf[NonTimeBasedDataSourceAccessor] && shouldSkipFeature) { + if (dataSource.isInstanceOf[NonTimeBasedDataSourceAccessor] && (shouldSkipFeature || shouldAddDefaultCol)) { if (dataSource.get().take(1).isEmpty) None else { Some(dataSource) } @@ -82,7 +84,7 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH Some(dataSource) } } catch { - case e: Exception => if (shouldSkipFeature) None else throw e + case e: Exception => if (shouldSkipFeature || shouldAddDefaultCol) None else throw e } anchorsWithDate.map(anchor => (anchor, timeSeriesSource)) diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala index 15509171b..132c36594 100644 --- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala +++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala @@ -7,11 +7,11 @@ import com.linkedin.feathr.offline.generation.SparkIOUtils import com.linkedin.feathr.offline.job.PreprocessedDataFrameManager import com.linkedin.feathr.offline.source.dataloader.{AvroJsonDataLoader, CsvDataLoader} import com.linkedin.feathr.offline.util.FeathrTestUtils -import com.linkedin.feathr.offline.util.FeathrUtils.{SKIP_MISSING_FEATURE, setFeathrJobParam} +import com.linkedin.feathr.offline.util.FeathrUtils.{ADD_DEFAULT_COL_FOR_MISSING_DATA, SKIP_MISSING_FEATURE, setFeathrJobParam} import org.apache.spark.sql.Row import org.apache.spark.sql.functions.col import org.apache.spark.sql.types._ -import org.testng.Assert.assertTrue +import org.testng.Assert.{assertEquals, assertTrue} import org.testng.annotations.{BeforeClass, Test} import scala.collection.mutable @@ -386,6 +386,153 @@ class AnchoredFeaturesIntegTest extends FeathrIntegTest { setFeathrJobParam(SKIP_MISSING_FEATURE, "false") } + /* + * Test skipping combination of anchored, derived and swa features. Also, test it with different default value types. + */ + @Test + def testAddDefaultForMissingAnchoredFeatures: Unit = { + setFeathrJobParam(ADD_DEFAULT_COL_FOR_MISSING_DATA, "true") + val df = runLocalFeatureJoinForTest( + joinConfigAsString = + """ + |settings: { + | joinTimeSettings: { + | timestampColumn: { + | def: "timestamp" + | format: "yyyy-MM-dd" + | } + | simulateTimeDelay: 1d + | } + |} + | + | features: { + | key: a_id + | featureList: ["featureWithNull", "derived_featureWithNull", "featureWithNull2", "featureWithNull3", "featureWithNull4", + | "featureWithNull5", "derived_featureWithNull2", "featureWithNull6", "featureWithNull7", "derived_featureWithNull7" + | "aEmbedding", "memberEmbeddingAutoTZ"] + | } + """.stripMargin, + featureDefAsString = + """ + | sources: { + | swaSource: { + | location: { path: "generaion/daily" } + | timePartitionPattern: "yyyy/MM/dd" + | timeWindowParameters: { + | timestampColumn: "timestamp" + | timestampColumnFormat: "yyyy-MM-dd" + | } + | } + | swaSource1: { + | location: { path: "generation/daily" } + | timePartitionPattern: "yyyy/MM/dd" + | timeWindowParameters: { + | timestampColumn: "timestamp" + | timestampColumnFormat: "yyyy-MM-dd" + | } + | } + |} + | + | anchors: { + | anchor1: { + | source: "anchorAndDerivations/nullVaueSource.avro.json" + | key: "toUpperCaseExt(mId)" + | features: { + | featureWithNull: { + | def: "isPresent(value) ? toNumeric(value) : 0" + | type: NUMERIC + | default: -1 + | } + | featureWithNull3: { + | def: "isPresent(value) ? toNumeric(value) : 0" + | type: CATEGORICAL + | default: "null" + | } + | featureWithNull7: { + | def: "isPresent(value) ? toNumeric(value) : 0" + | } + | featureWithNull4: { + | def: "isPresent(value) ? toNumeric(value) : 0" + | type: TERM_VECTOR + | default: {} + | } + | featureWithNull6: { + | def: "isPresent(value) ? toNumeric(value) : 0" + | type: DENSE_VECTOR + | default: [1, 2, 3] + | } + | featureWithNull5: { + | def: "isPresent(value) ? toNumeric(value) : 0" + | default: 1 + | } + | } + | } + | + | anchor2: { + | source: "anchorAndDerivations/nullValueSource.avro.json" + | key: "toUpperCaseExt(mId)" + | features: { + | featureWithNull2: "isPresent(value) ? toNumeric(value) : 0" + | } + | } + | swaAnchor: { + | source: "swaSource" + | key: "x" + | features: { + | aEmbedding: { + | def: "embedding" + | aggregation: LATEST + | window: 3d + | default: 2 + | } + | } + | } + | swaAnchor1: { + | source: "swaSource1" + | key: "x" + | features: { + | memberEmbeddingAutoTZ: { + | def: "embedding" + | aggregation: LATEST + | window: 3d + | type: { + | type: TENSOR + | tensorCategory: SPARSE + | dimensionType: [INT] + | valType: FLOAT + | } + | } + | } + | } + |} + |derivations: { + | + | derived_featureWithNull: "featureWithNull * 2" + | derived_featureWithNull2: "featureWithNull2 * 2" + | derived_featureWithNull7: "featureWithNull7 * 2" + |} + """.stripMargin, + observationDataPath = "anchorAndDerivations/testMVELLoopExpFeature-observations.csv") + + df.data.show() + val featureList = df.data.collect().sortBy(row => if (row.get(0) != null) row.getAs[String]("a_id") else "null") + assertEquals(featureList(0).getAs[Row]("aEmbedding"), + Row(mutable.WrappedArray.make(Array("")), mutable.WrappedArray.make(Array(2.0f)))) + assertEquals(featureList(0).getAs[Row]("featureWithNull3"), "null") + assertEquals(featureList(0).getAs[Row]("featureWithNull5"), mutable.Map("" -> 1.0f)) + assertEquals(featureList(0).getAs[Row]("featureWithNull7"), null) + assertEquals(featureList(0).getAs[Row]("featureWithNull"),-1.0f) + assertEquals(featureList(0).getAs[Row]("featureWithNull4"),Map()) + assertEquals(featureList(0).getAs[Row]("featureWithNull2"),1.0f) + assertEquals(featureList(0).getAs[Row]("derived_featureWithNull"), + Row(mutable.WrappedArray.make(Array("")), mutable.WrappedArray.make(Array(-2.0f)))) + assertEquals(featureList(0).getAs[Row]("derived_featureWithNull7"), + Row(mutable.WrappedArray.make(Array()), mutable.WrappedArray.empty)) + assertEquals(featureList(0).getAs[Row]("derived_featureWithNull2"), + Row(mutable.WrappedArray.make(Array("")), mutable.WrappedArray.make(Array(2.0f)))) + setFeathrJobParam(ADD_DEFAULT_COL_FOR_MISSING_DATA, "false") + } + /* * Test features with fdsExtract. */ diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala index f7ef08739..aa7e53377 100644 --- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala +++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala @@ -638,10 +638,6 @@ class SlidingWindowAggIntegTest extends FeathrIntegTest { setFeathrJobParam(SKIP_MISSING_FEATURE, "false") } - /** - * SWA test with add default col for missing data flag turned on. - */ - @Test def testSWAWithMissingFeatureDataFlag(): Unit = { setFeathrJobParam(FeathrUtils.ADD_DEFAULT_COL_FOR_MISSING_DATA, "true") val joinConfigAsString = @@ -668,7 +664,7 @@ class SlidingWindowAggIntegTest extends FeathrIntegTest { | featureList: ["simplePageViewCount", "simpleFeature", "derived_simpleFeature"] | } |] - """.stripMargin + """.stripMargin val featureDefAsString = """ |sources: { @@ -721,27 +717,12 @@ class SlidingWindowAggIntegTest extends FeathrIntegTest { |derivations: { | derived_simpleFeature: simpleFeature |} - """.stripMargin - val joinConfig = FeatureJoinConfig.parseJoinConfig(joinConfigAsString) - val feathrClient = FeathrClient.builder(ss).addFeatureDef(featureDefAsString).build() - val outputPath: String = FeatureJoinJob.SKIP_OUTPUT - - val defaultParams = Array( - "--local-mode", - "--feathr-config", - "", - "--output", - outputPath) - - val jobContext = FeatureJoinJob.parseInputArgument(defaultParams).jobJoinContext - val observationDataPath = "slidingWindowAgg/localAnchorTestObsData.avro.json" - val obsDf = loadObservationAsFDS(ss, observationDataPath, List()) - val res = feathrClient.joinFeaturesWithSuppressedExceptions(joinConfig, obsDf, jobContext) - val df = res._1.data.collect()(0) - res._1.data.show() + """.stripMargin + val res = runLocalFeatureJoinForTest(joinConfigAsString, featureDefAsString, observationDataPath = "slidingWindowAgg/localAnchorTestObsData.avro.json").data + res.show() + val df = res.collect()(0) assertEquals(df.getAs[Float]("simplePageViewCount"), 10f) - assertEquals(df.getAs[Float]("simpleFeature"), Row(mutable.WrappedArray.make(Array("")), mutable.WrappedArray.make(Array(20.0f)))) - assert(res._2(SuppressedExceptionHandlerUtils.MISSING_DATA_EXCEPTION) == "simpleFeature") + assertEquals(df.getAs[Float]("simpleFeature"), Row(mutable.WrappedArray.make(Array("")), mutable.WrappedArray.make(Array(20.0f)))) setFeathrJobParam(FeathrUtils.ADD_DEFAULT_COL_FOR_MISSING_DATA, "false") } diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/join/workflow/TestAnchoredFeatureJoinStep.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/join/workflow/TestAnchoredFeatureJoinStep.scala index 74a754768..309feaf7c 100644 --- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/join/workflow/TestAnchoredFeatureJoinStep.scala +++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/join/workflow/TestAnchoredFeatureJoinStep.scala @@ -43,6 +43,8 @@ class TestAnchoredFeatureJoinStep extends TestFeathr with MockitoSugar { .thenReturn("false") when(mockSparkContext.getConf).thenReturn(mockSparkConf) when(mockSparkSession.sparkContext).thenReturn(mockSparkContext) + when(mockSparkConf.get(s"${FeathrUtils.FEATHR_PARAMS_PREFIX}${FeathrUtils.ADD_DEFAULT_COL_FOR_MISSING_DATA}", + "false")).thenReturn("false") when(mockExecutionContext.sparkSession).thenReturn(mockSparkSession) when(mockLogicalPlan.joinStages).thenReturn(Seq.empty) diff --git a/gradle.properties b/gradle.properties index 2205f1dc5..76dbde675 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,3 @@ -version=1.0.2-rc11 +version=1.0.3-rc1 SONATYPE_AUTOMATIC_RELEASE=true POM_ARTIFACT_ID=feathr_2.12