Skip to content

Commit

Permalink
Revert mvel log (feathr-ai#1140)
Browse files Browse the repository at this point in the history
* Revert "Allow alien value in MVEL-based derivations (feathr-ai#1120) and remove stdout statements"

This reverts commit 55290e7.

* updating rc version after last commit

---------

Co-authored-by: Anirudh Agarwal <aniagarw@aniagarw-mn1.linkedin.biz>
  • Loading branch information
2 people authored and Yuqing-cat committed May 23, 2023
1 parent 006d097 commit 6c439e2
Show file tree
Hide file tree
Showing 15 changed files with 19 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,19 @@
import com.linkedin.feathr.common.tensor.TensorIterator;
import com.linkedin.feathr.common.types.ValueType;
import com.linkedin.feathr.common.util.CoercionUtils;
import com.linkedin.feathr.offline.mvel.plugins.FeathrExpressionExecutionContext;
import org.mvel2.DataConversion;
import org.mvel2.integration.impl.SimpleValueResolver;

import java.util.Optional;


/**
* FeatureVariableResolver takes a FeatureValue object for member variable during MVEL expression evaluation,
* and then resolve the value for that variable.
*/
public class FeatureVariableResolver extends SimpleValueResolver {
private FeatureValue _featureValue;
private Optional<FeathrExpressionExecutionContext> _mvelContext = Optional.empty();
public FeatureVariableResolver(FeatureValue featureValue, Optional<FeathrExpressionExecutionContext> mvelContext) {

public FeatureVariableResolver(FeatureValue featureValue) {
super(featureValue);
_featureValue = featureValue;
_mvelContext = mvelContext;
}

@Override
Expand All @@ -30,27 +25,21 @@ public Object getValue() {
return null;
}

Object fv = null;
switch (_featureValue.getFeatureType().getBasicType()) {
case NUMERIC:
fv = _featureValue.getAsNumeric(); break;
return _featureValue.getAsNumeric();
case TERM_VECTOR:
fv = getValueFromTermVector(); break;
return getValueFromTermVector();
case BOOLEAN:
case CATEGORICAL:
case CATEGORICAL_SET:
case DENSE_VECTOR:

case TENSOR:
fv = getValueFromTensor(); break;
return getValueFromTensor();

default:
throw new IllegalArgumentException("Unexpected feature type: " + _featureValue.getFeatureType().getBasicType());
}
// If there is any registered FeatureValue handler that can handle this feature value, return the converted value per request.
if (_mvelContext.isPresent() && _mvelContext.get().canConvertFromAny(fv)) {
return _mvelContext.get().convertFromAny(fv).head();
} else {
return fv;
throw new IllegalArgumentException("Unexpected feature type: " + _featureValue.getFeatureType().getBasicType());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ private[offline] object PostTransformationUtil {
featureType: FeatureTypes,
mvelContext: Option[FeathrExpressionExecutionContext]): Try[FeatureValue] = Try {
val args = Map(featureName -> Some(featureValue))
val variableResolverFactory = new FeatureVariableResolverFactory(args, mvelContext)
val variableResolverFactory = new FeatureVariableResolverFactory(args)
val transformedValue = MvelContext.executeExpressionWithPluginSupportWithFactory(compiledExpression, featureValue, variableResolverFactory, mvelContext.orNull)
CoercionUtilsScala.coerceToFeatureValue(transformedValue, featureType)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ private[offline] class MvelFeatureDerivationFunction(

override def getFeatures(inputs: Seq[Option[common.FeatureValue]]): Seq[Option[common.FeatureValue]] = {
val argMap = (parameterNames zip inputs).toMap
val variableResolverFactory = new FeatureVariableResolverFactory(argMap, mvelContext)
val variableResolverFactory = new FeatureVariableResolverFactory(argMap)

MvelUtils.executeExpression(compiledExpression, null, variableResolverFactory, featureName, mvelContext) match {
case Some(value) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ private[offline] class MvelFeatureDerivationFunction1(

override def getFeatures(inputs: Seq[Option[common.FeatureValue]]): Seq[Option[common.FeatureValue]] = {
val argMap = (parameterNames zip inputs).toMap
val variableResolverFactory = new FeatureVariableResolverFactory(argMap, mvelContext)
val variableResolverFactory = new FeatureVariableResolverFactory(argMap)

MvelUtils.executeExpression(compiledExpression, null, variableResolverFactory, featureName, mvelContext) match {
case Some(value) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ private[offline] class SimpleMvelDerivationFunction(expression: String, featureN
MvelContext.ensureInitialized()

// In order to prevent MVEL from barfing if a feature is null, we use a custom variable resolver that understands `Option`
val variableResolverFactory = new FeatureVariableResolverFactory(args, mvelContext)
val variableResolverFactory = new FeatureVariableResolverFactory(args)

if (TestFwkUtils.IS_DEBUGGER_ENABLED) {
while(TestFwkUtils.DERIVED_FEATURE_COUNTER > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,13 @@ object AnchorUDFOperator extends TransformationOperator {
val (withFeaturesDf, outputJoinKeyColumnNames) = newExtractor match {
case sparkExtractor: SimpleAnchorExtractorSpark =>
// Note that for Spark UDFs we only support SQL keys.
print("in simpleanchorextractorspark = " + newExtractor)
val sqlKeyExtractor = new SQLSourceKeyExtractor(keySeq)
val withKeyColumnDF = if (appendKeyColumns) sqlKeyExtractor.appendKeyColumns(inputDf) else inputDf
val outputJoinKeyColumnNames = getFeatureKeyColumnNames(sqlKeyExtractor, withKeyColumnDF)

val tensorizedFeatureColumns = sparkExtractor.getFeatures(inputDf, Map())
val transformedColsAndFormats: Map[(String, Column), FeatureColumnFormat] = extractor match {
case extractor2: SQLConfigurableAnchorExtractor =>
print("in SQLConfigurableAnchorExtractor = " + newExtractor)
// If instance of SQLConfigurableAnchorExtractor, get Tensor features
// Get DataFrame schema for tensor based on FML or inferred tensor type.
val featureSchemas = featureNamesInBatch.map(featureName => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@ object TransformationOperatorUtils {
def createFeatureDF(inputDf: DataFrame, featureColumnDefs: Seq[(String, Column)]): DataFrame = {
// first add a prefix to the feature column name in the schema
val featureColumnNamePrefix = "_frame_sql_feature_prefix_"
print(inputDf.columns.mkString("Array(", ", ", ")"))
val transformedDF = featureColumnDefs.foldLeft(inputDf)((baseDF, columnWithName) => {
print("COLUMN NAME = " + columnWithName)
val columnName = featureColumnNamePrefix + columnWithName._1
baseDF.withColumn(columnName, expr(columnWithName._2.toString()))
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,6 @@ object FeatureGenJob {
val dataSourceConfigs = DataSourceConfigUtils.getConfigs(cmdParser)
val featureGenJobContext = new FeatureGenJobContext(workDir, paramsOverride, featureConfOverride)

println("dataSourceConfigs: ")
println(dataSourceConfigs)
(applicationConfigPath, featureDefinitionsInput, featureGenJobContext, dataSourceConfigs)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ object FeatureJoinJob {
def run(ss: SparkSession, hadoopConf: Configuration, jobContext: FeathrJoinJobContext, dataPathHandlers: List[DataPathHandler]): Unit = {
val dataLoaderHandlers: List[DataLoaderHandler] = dataPathHandlers.map(_.dataLoaderHandler)
val joinConfig = FeatureJoinConfig.parseJoinConfig(hdfsFileReader(ss, jobContext.joinConfig))
print("join config is, ",joinConfig)
// check read authorization for observation data, and write authorization for output path
checkAuthorization(ss, hadoopConf, jobContext, dataLoaderHandlers)

Expand All @@ -86,7 +85,6 @@ object FeatureJoinJob {
def stringifyFeatureNames(nameSet: Set[String]): String = nameSet.toSeq.sorted.toArray.mkString("\n\t")

def hdfsFileReader(ss: SparkSession, path: String): String = {
print("ss.sparkContext.textFile(path),", path)
ss.sparkContext.textFile(path).collect.mkString("\n")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -888,7 +888,6 @@ private[offline] object FeatureTransformation {
val features = transformers map {
case extractor: AnchorExtractor[IndexedRecord] =>
val features = extractor.getFeatures(record)
print(features)
FeatureValueTypeValidator.validate(features, featureTypeConfigs)
features
case extractor =>
Expand Down Expand Up @@ -1423,7 +1422,6 @@ private[offline] object FeatureTransformation {
val features = transformers map {
case extractor: AnchorExtractor[Any] =>
val features = extractor.getFeatures(row)
print(features)
FeatureValueTypeValidator.validate(features, featureTypeConfigs)
features
case extractor =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
package com.linkedin.feathr.offline.mvel

import com.linkedin.feathr.common.{FeatureValue, FeatureVariableResolver}
import com.linkedin.feathr.offline.mvel.plugins.FeathrExpressionExecutionContext
import org.mvel2.integration.VariableResolver
import org.mvel2.integration.impl.BaseVariableResolverFactory

import java.util.Optional
import scala.collection.JavaConverters._

private[offline] class FeatureVariableResolverFactory(features: Map[String, Option[FeatureValue]], mvelContext: Option[FeathrExpressionExecutionContext]) extends BaseVariableResolverFactory {

variableResolvers = features.mapValues(x => new FeatureVariableResolver(x.orNull, Optional.ofNullable(mvelContext.orNull))).asInstanceOf[Map[String, VariableResolver]].asJava
private[offline] class FeatureVariableResolverFactory(features: Map[String, Option[FeatureValue]]) extends BaseVariableResolverFactory {
variableResolvers = features.mapValues(x => new FeatureVariableResolver(x.orNull)).asInstanceOf[Map[String, VariableResolver]].asJava

override def isTarget(name: String): Boolean = features.contains(name)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,28 +79,6 @@ class FeathrExpressionExecutionContext extends Serializable {
}
}

/**
* Check if there is registered converters that can handle the conversion.
* @param inputValue input value to check
* @return whether it can be converted or not
*/
def canConvertFromAny(inputValue: AnyRef): Boolean = {
val result = converters.value.filter(converter => converter._2.canConvertFrom(inputValue.getClass))
result.nonEmpty
}

/**
* Convert the input Check if there is registered converters that can handle the conversion.
* @param inputValue input value to convert
* @return return all converted values produced by registered converters
*/
def convertFromAny(inputValue: AnyRef): List[AnyRef] = {
converters.value.collect {
case converter if converter._2.canConvertFrom(inputValue.getClass) =>
converter._2.convertFrom(inputValue)
}.toList
}

/**
* Convert the input to output type using the registered converters
* @param in value to be converted
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ private[offline] object CoercionUtilsScala {
}

def coerceFieldToFeatureValue(row: Row, schema: StructType, fieldName: String, featureTypeConfig: FeatureTypeConfig): FeatureValue = {
print("ROW IS " + row + " and featureTypeConfig is " + featureTypeConfig + " and feature name is " + fieldName)
val fieldIndex = schema.fieldIndex(fieldName)
val fieldType = schema.toList(fieldIndex)
val valueMap = if (row.get(fieldIndex) == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.feathr.offline

import com.linkedin.feathr.common.FeatureTypes
import com.linkedin.feathr.offline.anchored.keyExtractor.AlienSourceKeyExtractorAdaptor
import com.linkedin.feathr.offline.client.plugins.FeathrUdfPluginContext
import com.linkedin.feathr.offline.derived.AlienDerivationFunctionAdaptor
Expand All @@ -8,6 +9,7 @@ import com.linkedin.feathr.offline.plugins.{AlienFeatureValue, AlienFeatureValue
import com.linkedin.feathr.offline.util.FeathrTestUtils
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{FloatType, StringType, StructField, StructType}
import org.testng.Assert.assertEquals
import org.testng.annotations.Test

class TestFeathrUdfPlugins extends FeathrIntegTest {
Expand All @@ -17,7 +19,7 @@ class TestFeathrUdfPlugins extends FeathrIntegTest {
private val mvelContext = new FeathrExpressionExecutionContext()

// todo - support udf plugins through FCM
@Test (enabled = true)
@Test (enabled = false)
def testMvelUdfPluginSupport: Unit = {
mvelContext.setupExecutorMvelContext(classOf[AlienFeatureValue], new AlienFeatureValueTypeAdaptor(), ss.sparkContext)
FeathrUdfPluginContext.registerUdfAdaptor(new AlienDerivationFunctionAdaptor(), ss.sparkContext)
Expand Down Expand Up @@ -111,6 +113,8 @@ class TestFeathrUdfPlugins extends FeathrIntegTest {
observationDataPath = "anchorAndDerivations/testMVELLoopExpFeature-observations.csv",
mvelContext = Some(mvelContext))

val f8Type = df.fdsMetadata.header.get.featureInfoMap.filter(_._1.getFeatureName == "f8").head._2.featureType.getFeatureType
assertEquals(f8Type, FeatureTypes.NUMERIC)

val selectedColumns = Seq("a_id", "fA")
val filteredDf = df.data.select(selectedColumns.head, selectedColumns.tail: _*)
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
version=1.0.2-rc3
version=1.0.2-rc4
SONATYPE_AUTOMATIC_RELEASE=true
POM_ARTIFACT_ID=feathr_2.12

0 comments on commit 6c439e2

Please sign in to comment.