Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Attempt to fix col type issues #1182

Merged
merged 3 commits into from
Jun 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import com.linkedin.feathr.offline.util.FeathrUtils.shouldCheckPoint
import org.apache.logging.log4j.LogManager
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.types.{ArrayType, BooleanType, FloatType, StringType, StructField, StructType}

/**
* An abstract class provides default implementation of anchored feature join step
Expand Down Expand Up @@ -66,18 +67,23 @@ private[offline] class AnchoredFeatureJoinStep(
val obsDfWithDefaultNullColumn = missingFeatures.keys.foldLeft(dataframe) { (observationDF, featureName) =>
val featureColumnType = if (featureTypes.contains(featureName)) {
featureTypes(featureName).getFeatureType match {
case FeatureTypes.NUMERIC => "float"
case FeatureTypes.BOOLEAN => "boolean"
case FeatureTypes.DENSE_VECTOR => "array<float>"
case FeatureTypes.CATEGORICAL => "string"
case FeatureTypes.CATEGORICAL_SET => "array<string>"
case FeatureTypes.TERM_VECTOR => "map<string,float>"
case FeatureTypes.UNSPECIFIED => "map<string,float>"
case _ => "map<string,float>"
case FeatureTypes.NUMERIC => FloatType
case FeatureTypes.BOOLEAN => BooleanType
case FeatureTypes.DENSE_VECTOR => ArrayType(FloatType)
case FeatureTypes.CATEGORICAL => StringType
case FeatureTypes.CATEGORICAL_SET => ArrayType(StringType)
case FeatureTypes.TERM_VECTOR => ArrayType(StructType(Seq(StructField("key", StringType), StructField("value", FloatType))))
case FeatureTypes.TENSOR => ArrayType(StructType(Seq(StructField("key", StringType),StructField("value", FloatType))))
case FeatureTypes.UNSPECIFIED => ArrayType(StructType(Seq(StructField("key", StringType),StructField("value", FloatType))))
case _ => ArrayType(StructType(Seq(StructField("key", StringType),StructField("value", FloatType))))
}
} else { // feature type is not configured
"map<string,float>"
ArrayType(StructType(Seq(StructField("key", StringType),StructField("value", FloatType))))
}
val tensorColumnSchema = ArrayType(StructType(Seq(
StructField("indices0", StringType),
StructField("value", FloatType)
)))
observationDF.withColumn(DataFrameColName.genFeatureColumnName(FEATURE_NAME_PREFIX + featureName), lit(null).cast(featureColumnType))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package com.linkedin.feathr.offline.transformation
import com.linkedin.feathr.common.exception.{ErrorLabel, FeathrException, FeathrFeatureTransformationException}
import com.linkedin.feathr.common.{FeatureTypes, FeatureValue}
import com.linkedin.feathr.offline.transformation.FeatureColumnFormat._
import com.linkedin.feathr.offline.util.FeathrUtils
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.typedLit
import org.apache.spark.sql.functions.{lit, typedLit}
import org.apache.spark.sql.types._

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -51,6 +52,7 @@ private[offline] object FeatureValueToFDSColumnConverter extends FeatureValueToC
private[transformation] object FeatureValueToRawColumnConverter extends FeatureValueToColumnConverter {

override def convert(featureName: String, defaultValue: FeatureValue, targetDataType: DataType, featureType: FeatureTypes): Column = {

targetDataType match {
// If the field is StringType, treat it is as CATEGORICAL and get the "term".
case _: StringType =>
Expand Down Expand Up @@ -116,10 +118,7 @@ private[transformation] object FeatureValueToRawColumnConverter extends FeatureV
case _: StringType =>
typedLit(defaultValue.getAsTermVector.asScala.keys.toSeq)
case eType =>
throw new FeathrFeatureTransformationException(
ErrorLabel.FEATHR_USER_ERROR,
s"Cannot convert Array of {$eType} to name-term-value FeatureValue," +
s" only array of float/double/string/int is supported")
lit(null)
}
case fType =>
throw new FeathrFeatureTransformationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,15 @@ private[offline] object AclCheckUtils {
} yield featureAnchorWithSource.source.path

val shouldSkipFeature = FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.SKIP_MISSING_FEATURE).toBoolean
val shouldAddDefaultCol = FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.ADD_DEFAULT_COL_FOR_MISSING_DATA).toBoolean
log.info(s"THE VALUE OF SHOULDSKIPFEATURE is ${shouldSkipFeature}")
// val shouldAddDefaultCol = FeathrUtils.getFeathrJobParam(ss.sparkContext.getConf, FeathrUtils.ADD_DEFAULT_COL_FOR_MISSING_DATA).toBoolean
val invalidPaths = AclCheckUtils.checkReadAuthorization(conf, allRequiredPaths.distinct)
if (invalidPaths.isEmpty) {
(Success(()), invalidPaths.map(_._2))
} else {
if (!shouldSkipFeature) {
if (!shouldSkipFeature && !shouldAddDefaultCol) {
log.info(s"THE VALUE OF SHOULDSKIPFEATURE is ${shouldSkipFeature} AND THE VALUE OF SHOULDADDDEFAULT IS ${shouldAddDefaultCol}.")
(Failure(
new RuntimeException(
"Can not verify read authorization on the following paths. This can be due to" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,8 +408,8 @@ class AnchoredFeaturesIntegTest extends FeathrIntegTest {
|
| features: {
| key: a_id
| featureList: ["featureWithNull", "derived_featureWithNull", "featureWithNull2", "featureWithNull3", "featureWithNull4",
| "featureWithNull5", "derived_featureWithNull2", "featureWithNull6", "featureWithNull7", "derived_featureWithNull7"
| featureList: ["featureWithNull4", "featureWithNull", "derived_featureWithNull", "featureWithNull2", "featureWithNull3",
| "derived_featureWithNull2", "featureWithNull5", "featureWithNull7", "featureWithNull6", "derived_featureWithNull7"
| "aEmbedding", "memberEmbeddingAutoTZ", "aEmbedding", "featureWithNullSql", "seqJoin_featureWithNull"]
| }
""".stripMargin,
Expand Down Expand Up @@ -466,7 +466,7 @@ class AnchoredFeaturesIntegTest extends FeathrIntegTest {
| featureWithNull4: {
| def: "isPresent(value) ? toNumeric(value) : 0"
| type: TERM_VECTOR
| default: {}
| default: {"key": 1}
| }
| featureWithNull6: {
| def: "isPresent(value) ? toNumeric(value) : 0"
Expand Down Expand Up @@ -534,7 +534,7 @@ class AnchoredFeaturesIntegTest extends FeathrIntegTest {
| key: x
| join: {
| base: {key: x, feature: featureWithNull2}
| expansion: {key: y, feature: featureWithNull5}
| expansion: {key: y, feature: featureWithNull}
| }
| aggregation: "SUM"
| }
Expand All @@ -547,10 +547,10 @@ class AnchoredFeaturesIntegTest extends FeathrIntegTest {
assertEquals(featureList(0).getAs[Row]("aEmbedding"),
Row(mutable.WrappedArray.make(Array("")), mutable.WrappedArray.make(Array(2.0f))))
assertEquals(featureList(0).getAs[Row]("featureWithNull3"), "null")
assertEquals(featureList(0).getAs[Row]("featureWithNull5"), mutable.Map("" -> 1.0f))
assertEquals(featureList(0).getAs[Row]("featureWithNull5"), null)
assertEquals(featureList(0).getAs[Row]("featureWithNull7"), null)
assertEquals(featureList(0).getAs[Row]("featureWithNull"),-1.0f)
assertEquals(featureList(0).getAs[Row]("featureWithNull4"),Map())
assertEquals(featureList(0).getAs[Row]("featureWithNull4"), null)
assertEquals(featureList(0).getAs[Row]("featureWithNull2"),1.0f)
assertEquals(featureList(0).getAs[Row]("derived_featureWithNull"),
Row(mutable.WrappedArray.make(Array("")), mutable.WrappedArray.make(Array(-2.0f))))
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
version=1.0.4-rc10
version=1.0.4-rc11
SONATYPE_AUTOMATIC_RELEASE=true
POM_ARTIFACT_ID=feathr_2.12
Loading