From 6c439e2a749639a717db01515c955282f53eabab Mon Sep 17 00:00:00 2001 From: Anirudh Agarwal Date: Wed, 5 Apr 2023 14:58:08 -0700 Subject: [PATCH] Revert mvel log (#1140) * Revert "Allow alien value in MVEL-based derivations (#1120) and remove stdout statements" This reverts commit 55290e7824c217bd563447ff2df04ed238e5dd4f. * updating rc version after last commit --------- Co-authored-by: Anirudh Agarwal --- .../common/FeatureVariableResolver.java | 25 ++++++------------- .../offline/PostTransformationUtil.scala | 2 +- .../MvelFeatureDerivationFunction.scala | 2 +- .../MvelFeatureDerivationFunction1.scala | 2 +- .../SimpleMvelDerivationFunction.scala | 2 +- .../transformation/AnchorUDFOperator.scala | 2 -- .../TransformationOperatorUtils.scala | 2 -- .../feathr/offline/job/FeatureGenJob.scala | 2 -- .../feathr/offline/job/FeatureJoinJob.scala | 2 -- .../offline/job/FeatureTransformation.scala | 2 -- .../mvel/FeatureVariableResolverFactory.scala | 7 ++---- .../FeathrExpressionExecutionContext.scala | 22 ---------------- .../offline/util/CoercionUtilsScala.scala | 1 - .../feathr/offline/TestFeathrUdfPlugins.scala | 6 ++++- gradle.properties | 2 +- 15 files changed, 19 insertions(+), 62 deletions(-) diff --git a/feathr-impl/src/main/java/com/linkedin/feathr/common/FeatureVariableResolver.java b/feathr-impl/src/main/java/com/linkedin/feathr/common/FeatureVariableResolver.java index bb84152ca..3dd6718f9 100644 --- a/feathr-impl/src/main/java/com/linkedin/feathr/common/FeatureVariableResolver.java +++ b/feathr-impl/src/main/java/com/linkedin/feathr/common/FeatureVariableResolver.java @@ -4,12 +4,8 @@ import com.linkedin.feathr.common.tensor.TensorIterator; import com.linkedin.feathr.common.types.ValueType; import com.linkedin.feathr.common.util.CoercionUtils; -import com.linkedin.feathr.offline.mvel.plugins.FeathrExpressionExecutionContext; -import org.mvel2.DataConversion; import org.mvel2.integration.impl.SimpleValueResolver; -import java.util.Optional; - /** * FeatureVariableResolver takes a FeatureValue object for member variable during MVEL expression evaluation, @@ -17,11 +13,10 @@ */ public class FeatureVariableResolver extends SimpleValueResolver { private FeatureValue _featureValue; - private Optional _mvelContext = Optional.empty(); - public FeatureVariableResolver(FeatureValue featureValue, Optional mvelContext) { + + public FeatureVariableResolver(FeatureValue featureValue) { super(featureValue); _featureValue = featureValue; - _mvelContext = mvelContext; } @Override @@ -30,27 +25,21 @@ public Object getValue() { return null; } - Object fv = null; switch (_featureValue.getFeatureType().getBasicType()) { case NUMERIC: - fv = _featureValue.getAsNumeric(); break; + return _featureValue.getAsNumeric(); case TERM_VECTOR: - fv = getValueFromTermVector(); break; + return getValueFromTermVector(); case BOOLEAN: case CATEGORICAL: case CATEGORICAL_SET: case DENSE_VECTOR: case TENSOR: - fv = getValueFromTensor(); break; + return getValueFromTensor(); + default: - throw new IllegalArgumentException("Unexpected feature type: " + _featureValue.getFeatureType().getBasicType()); - } - // If there is any registered FeatureValue handler that can handle this feature value, return the converted value per request. - if (_mvelContext.isPresent() && _mvelContext.get().canConvertFromAny(fv)) { - return _mvelContext.get().convertFromAny(fv).head(); - } else { - return fv; + throw new IllegalArgumentException("Unexpected feature type: " + _featureValue.getFeatureType().getBasicType()); } } diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/PostTransformationUtil.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/PostTransformationUtil.scala index 79518fd10..b1f75d662 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/PostTransformationUtil.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/PostTransformationUtil.scala @@ -130,7 +130,7 @@ private[offline] object PostTransformationUtil { featureType: FeatureTypes, mvelContext: Option[FeathrExpressionExecutionContext]): Try[FeatureValue] = Try { val args = Map(featureName -> Some(featureValue)) - val variableResolverFactory = new FeatureVariableResolverFactory(args, mvelContext) + val variableResolverFactory = new FeatureVariableResolverFactory(args) val transformedValue = MvelContext.executeExpressionWithPluginSupportWithFactory(compiledExpression, featureValue, variableResolverFactory, mvelContext.orNull) CoercionUtilsScala.coerceToFeatureValue(transformedValue, featureType) } diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/functions/MvelFeatureDerivationFunction.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/functions/MvelFeatureDerivationFunction.scala index e8f7d4196..42f09ad21 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/functions/MvelFeatureDerivationFunction.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/functions/MvelFeatureDerivationFunction.scala @@ -42,7 +42,7 @@ private[offline] class MvelFeatureDerivationFunction( override def getFeatures(inputs: Seq[Option[common.FeatureValue]]): Seq[Option[common.FeatureValue]] = { val argMap = (parameterNames zip inputs).toMap - val variableResolverFactory = new FeatureVariableResolverFactory(argMap, mvelContext) + val variableResolverFactory = new FeatureVariableResolverFactory(argMap) MvelUtils.executeExpression(compiledExpression, null, variableResolverFactory, featureName, mvelContext) match { case Some(value) => diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/functions/MvelFeatureDerivationFunction1.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/functions/MvelFeatureDerivationFunction1.scala index 7e403089e..2d5e30fb8 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/functions/MvelFeatureDerivationFunction1.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/functions/MvelFeatureDerivationFunction1.scala @@ -39,7 +39,7 @@ private[offline] class MvelFeatureDerivationFunction1( override def getFeatures(inputs: Seq[Option[common.FeatureValue]]): Seq[Option[common.FeatureValue]] = { val argMap = (parameterNames zip inputs).toMap - val variableResolverFactory = new FeatureVariableResolverFactory(argMap, mvelContext) + val variableResolverFactory = new FeatureVariableResolverFactory(argMap) MvelUtils.executeExpression(compiledExpression, null, variableResolverFactory, featureName, mvelContext) match { case Some(value) => diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/functions/SimpleMvelDerivationFunction.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/functions/SimpleMvelDerivationFunction.scala index 8d9695e2b..7ed9ad0a8 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/functions/SimpleMvelDerivationFunction.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/derived/functions/SimpleMvelDerivationFunction.scala @@ -44,7 +44,7 @@ private[offline] class SimpleMvelDerivationFunction(expression: String, featureN MvelContext.ensureInitialized() // In order to prevent MVEL from barfing if a feature is null, we use a custom variable resolver that understands `Option` - val variableResolverFactory = new FeatureVariableResolverFactory(args, mvelContext) + val variableResolverFactory = new FeatureVariableResolverFactory(args) if (TestFwkUtils.IS_DEBUGGER_ENABLED) { while(TestFwkUtils.DERIVED_FEATURE_COUNTER > 0) { diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/evaluator/transformation/AnchorUDFOperator.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/evaluator/transformation/AnchorUDFOperator.scala index f2921aac2..810b10168 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/evaluator/transformation/AnchorUDFOperator.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/evaluator/transformation/AnchorUDFOperator.scala @@ -53,7 +53,6 @@ object AnchorUDFOperator extends TransformationOperator { val (withFeaturesDf, outputJoinKeyColumnNames) = newExtractor match { case sparkExtractor: SimpleAnchorExtractorSpark => // Note that for Spark UDFs we only support SQL keys. - print("in simpleanchorextractorspark = " + newExtractor) val sqlKeyExtractor = new SQLSourceKeyExtractor(keySeq) val withKeyColumnDF = if (appendKeyColumns) sqlKeyExtractor.appendKeyColumns(inputDf) else inputDf val outputJoinKeyColumnNames = getFeatureKeyColumnNames(sqlKeyExtractor, withKeyColumnDF) @@ -61,7 +60,6 @@ object AnchorUDFOperator extends TransformationOperator { val tensorizedFeatureColumns = sparkExtractor.getFeatures(inputDf, Map()) val transformedColsAndFormats: Map[(String, Column), FeatureColumnFormat] = extractor match { case extractor2: SQLConfigurableAnchorExtractor => - print("in SQLConfigurableAnchorExtractor = " + newExtractor) // If instance of SQLConfigurableAnchorExtractor, get Tensor features // Get DataFrame schema for tensor based on FML or inferred tensor type. val featureSchemas = featureNamesInBatch.map(featureName => { diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/evaluator/transformation/TransformationOperatorUtils.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/evaluator/transformation/TransformationOperatorUtils.scala index 631e399f3..5f1721a6f 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/evaluator/transformation/TransformationOperatorUtils.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/evaluator/transformation/TransformationOperatorUtils.scala @@ -42,9 +42,7 @@ object TransformationOperatorUtils { def createFeatureDF(inputDf: DataFrame, featureColumnDefs: Seq[(String, Column)]): DataFrame = { // first add a prefix to the feature column name in the schema val featureColumnNamePrefix = "_frame_sql_feature_prefix_" - print(inputDf.columns.mkString("Array(", ", ", ")")) val transformedDF = featureColumnDefs.foldLeft(inputDf)((baseDF, columnWithName) => { - print("COLUMN NAME = " + columnWithName) val columnName = featureColumnNamePrefix + columnWithName._1 baseDF.withColumn(columnName, expr(columnWithName._2.toString())) }) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/FeatureGenJob.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/FeatureGenJob.scala index 8df0d942d..c936f27c4 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/FeatureGenJob.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/FeatureGenJob.scala @@ -75,8 +75,6 @@ object FeatureGenJob { val dataSourceConfigs = DataSourceConfigUtils.getConfigs(cmdParser) val featureGenJobContext = new FeatureGenJobContext(workDir, paramsOverride, featureConfOverride) - println("dataSourceConfigs: ") - println(dataSourceConfigs) (applicationConfigPath, featureDefinitionsInput, featureGenJobContext, dataSourceConfigs) } diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/FeatureJoinJob.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/FeatureJoinJob.scala index 13ecd76d8..b76b54e52 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/FeatureJoinJob.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/FeatureJoinJob.scala @@ -69,7 +69,6 @@ object FeatureJoinJob { def run(ss: SparkSession, hadoopConf: Configuration, jobContext: FeathrJoinJobContext, dataPathHandlers: List[DataPathHandler]): Unit = { val dataLoaderHandlers: List[DataLoaderHandler] = dataPathHandlers.map(_.dataLoaderHandler) val joinConfig = FeatureJoinConfig.parseJoinConfig(hdfsFileReader(ss, jobContext.joinConfig)) - print("join config is, ",joinConfig) // check read authorization for observation data, and write authorization for output path checkAuthorization(ss, hadoopConf, jobContext, dataLoaderHandlers) @@ -86,7 +85,6 @@ object FeatureJoinJob { def stringifyFeatureNames(nameSet: Set[String]): String = nameSet.toSeq.sorted.toArray.mkString("\n\t") def hdfsFileReader(ss: SparkSession, path: String): String = { - print("ss.sparkContext.textFile(path),", path) ss.sparkContext.textFile(path).collect.mkString("\n") } diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/FeatureTransformation.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/FeatureTransformation.scala index 67614284b..a7ba6097e 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/FeatureTransformation.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/job/FeatureTransformation.scala @@ -888,7 +888,6 @@ private[offline] object FeatureTransformation { val features = transformers map { case extractor: AnchorExtractor[IndexedRecord] => val features = extractor.getFeatures(record) - print(features) FeatureValueTypeValidator.validate(features, featureTypeConfigs) features case extractor => @@ -1423,7 +1422,6 @@ private[offline] object FeatureTransformation { val features = transformers map { case extractor: AnchorExtractor[Any] => val features = extractor.getFeatures(row) - print(features) FeatureValueTypeValidator.validate(features, featureTypeConfigs) features case extractor => diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/mvel/FeatureVariableResolverFactory.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/mvel/FeatureVariableResolverFactory.scala index 8f05ac3f0..a81e04d11 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/mvel/FeatureVariableResolverFactory.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/mvel/FeatureVariableResolverFactory.scala @@ -1,16 +1,13 @@ package com.linkedin.feathr.offline.mvel import com.linkedin.feathr.common.{FeatureValue, FeatureVariableResolver} -import com.linkedin.feathr.offline.mvel.plugins.FeathrExpressionExecutionContext import org.mvel2.integration.VariableResolver import org.mvel2.integration.impl.BaseVariableResolverFactory -import java.util.Optional import scala.collection.JavaConverters._ -private[offline] class FeatureVariableResolverFactory(features: Map[String, Option[FeatureValue]], mvelContext: Option[FeathrExpressionExecutionContext]) extends BaseVariableResolverFactory { - - variableResolvers = features.mapValues(x => new FeatureVariableResolver(x.orNull, Optional.ofNullable(mvelContext.orNull))).asInstanceOf[Map[String, VariableResolver]].asJava +private[offline] class FeatureVariableResolverFactory(features: Map[String, Option[FeatureValue]]) extends BaseVariableResolverFactory { + variableResolvers = features.mapValues(x => new FeatureVariableResolver(x.orNull)).asInstanceOf[Map[String, VariableResolver]].asJava override def isTarget(name: String): Boolean = features.contains(name) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/mvel/plugins/FeathrExpressionExecutionContext.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/mvel/plugins/FeathrExpressionExecutionContext.scala index d279576c5..ba63a654c 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/mvel/plugins/FeathrExpressionExecutionContext.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/mvel/plugins/FeathrExpressionExecutionContext.scala @@ -79,28 +79,6 @@ class FeathrExpressionExecutionContext extends Serializable { } } - /** - * Check if there is registered converters that can handle the conversion. - * @param inputValue input value to check - * @return whether it can be converted or not - */ - def canConvertFromAny(inputValue: AnyRef): Boolean = { - val result = converters.value.filter(converter => converter._2.canConvertFrom(inputValue.getClass)) - result.nonEmpty - } - - /** - * Convert the input Check if there is registered converters that can handle the conversion. - * @param inputValue input value to convert - * @return return all converted values produced by registered converters - */ - def convertFromAny(inputValue: AnyRef): List[AnyRef] = { - converters.value.collect { - case converter if converter._2.canConvertFrom(inputValue.getClass) => - converter._2.convertFrom(inputValue) - }.toList - } - /** * Convert the input to output type using the registered converters * @param in value to be converted diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/CoercionUtilsScala.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/CoercionUtilsScala.scala index 69dce9b57..8c7cc1ed2 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/CoercionUtilsScala.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/CoercionUtilsScala.scala @@ -76,7 +76,6 @@ private[offline] object CoercionUtilsScala { } def coerceFieldToFeatureValue(row: Row, schema: StructType, fieldName: String, featureTypeConfig: FeatureTypeConfig): FeatureValue = { - print("ROW IS " + row + " and featureTypeConfig is " + featureTypeConfig + " and feature name is " + fieldName) val fieldIndex = schema.fieldIndex(fieldName) val fieldType = schema.toList(fieldIndex) val valueMap = if (row.get(fieldIndex) == null) { diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/TestFeathrUdfPlugins.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/TestFeathrUdfPlugins.scala index 950a1ad31..64d2cee62 100644 --- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/TestFeathrUdfPlugins.scala +++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/TestFeathrUdfPlugins.scala @@ -1,5 +1,6 @@ package com.linkedin.feathr.offline +import com.linkedin.feathr.common.FeatureTypes import com.linkedin.feathr.offline.anchored.keyExtractor.AlienSourceKeyExtractorAdaptor import com.linkedin.feathr.offline.client.plugins.FeathrUdfPluginContext import com.linkedin.feathr.offline.derived.AlienDerivationFunctionAdaptor @@ -8,6 +9,7 @@ import com.linkedin.feathr.offline.plugins.{AlienFeatureValue, AlienFeatureValue import com.linkedin.feathr.offline.util.FeathrTestUtils import org.apache.spark.sql.Row import org.apache.spark.sql.types.{FloatType, StringType, StructField, StructType} +import org.testng.Assert.assertEquals import org.testng.annotations.Test class TestFeathrUdfPlugins extends FeathrIntegTest { @@ -17,7 +19,7 @@ class TestFeathrUdfPlugins extends FeathrIntegTest { private val mvelContext = new FeathrExpressionExecutionContext() // todo - support udf plugins through FCM - @Test (enabled = true) + @Test (enabled = false) def testMvelUdfPluginSupport: Unit = { mvelContext.setupExecutorMvelContext(classOf[AlienFeatureValue], new AlienFeatureValueTypeAdaptor(), ss.sparkContext) FeathrUdfPluginContext.registerUdfAdaptor(new AlienDerivationFunctionAdaptor(), ss.sparkContext) @@ -111,6 +113,8 @@ class TestFeathrUdfPlugins extends FeathrIntegTest { observationDataPath = "anchorAndDerivations/testMVELLoopExpFeature-observations.csv", mvelContext = Some(mvelContext)) + val f8Type = df.fdsMetadata.header.get.featureInfoMap.filter(_._1.getFeatureName == "f8").head._2.featureType.getFeatureType + assertEquals(f8Type, FeatureTypes.NUMERIC) val selectedColumns = Seq("a_id", "fA") val filteredDf = df.data.select(selectedColumns.head, selectedColumns.tail: _*) diff --git a/gradle.properties b/gradle.properties index 0433134fb..061616df2 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,3 @@ -version=1.0.2-rc3 +version=1.0.2-rc4 SONATYPE_AUTOMATIC_RELEASE=true POM_ARTIFACT_ID=feathr_2.12