Skip to content

Commit

Permalink
Support SWA with groupBy to 1d tensor conversion (feathr-ai#748)
Browse files Browse the repository at this point in the history
* Support SWA with groupby to 1d tensor conversion
  • Loading branch information
jaymo001 authored and hyingyang-linkedin committed Oct 25, 2022
1 parent aa2f4cf commit a136cf8
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@ package com.linkedin.feathr.offline.transformation

import com.linkedin.feathr.common.exception.{ErrorLabel, FeathrException}
import com.linkedin.feathr.common.tensor.TensorData

import java.util
import com.linkedin.feathr.common.util.CoercionUtils
import com.linkedin.feathr.offline.util.FeaturizedDatasetUtils
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.types._

import java.util
import scala.collection.JavaConverters._
import scala.collection.convert.Wrappers.JMapWrapper
import scala.collection.mutable
Expand Down Expand Up @@ -253,7 +252,13 @@ private[offline] object FDSConversionUtils {
case values: util.ArrayList[Any] =>
values.asScala.toArray
case values: mutable.WrappedArray[Any] =>
values.asInstanceOf[mutable.WrappedArray[Any]].toArray
if (values.nonEmpty && values(0).isInstanceOf[GenericRowWithSchema]) {
// Assuming the result is returned by SWA feature with groupBy, hence keeping only the
// feature value as an array and dropping the index info.
values.asInstanceOf[mutable.WrappedArray[GenericRowWithSchema]].map(v => v.get(v.size - 1)).toArray
} else {
values.toArray
}
case values: List[Any] =>
values.toArray
case mapValues: Map[Integer, Any] =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,17 @@ package com.linkedin.feathr.offline.util
import com.linkedin.feathr.common.TensorUtils
import com.linkedin.feathr.common.tensor.{TensorType, Tensors}
import com.linkedin.feathr.common.types.PrimitiveType

import java.util
import java.util.Collections
import com.linkedin.feathr.offline.AssertFeatureUtils
import com.linkedin.feathr.offline.transformation.FDSConversionUtils
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.catalyst.expressions.{GenericRow, GenericRowWithSchema}
import org.apache.spark.sql.types._
import org.scalatest.testng.TestNGSuite
import org.testng.Assert.{assertEquals, assertTrue}
import org.testng.annotations.{DataProvider, Test}

import java.util
import java.util.Collections
import scala.collection.mutable

class TestFDSConversionUtil extends TestNGSuite {
Expand Down Expand Up @@ -141,10 +140,18 @@ class TestFDSConversionUtil extends TestNGSuite {

@DataProvider
def dataForTestConvertRawValueTo1DFDSDenseTensorRowTz(): Array[Array[Any]] = {
val eleType = StructType(
StructField("group", IntegerType, false) ::
StructField("value", IntegerType, false) :: Nil
)
val row1 = new GenericRowWithSchema(Array(1, 3), eleType)
val row2 = new GenericRowWithSchema(Array(2, 4), eleType)
Array(
Array(mutable.WrappedArray.make(Array(2.0f, 6.0f)), util.Arrays.asList(2.0f, 6.0f).toArray),
Array(Array(1.1).toList, util.Arrays.asList(1.1).toArray),
Array(Map("a" -> 1.1), util.Arrays.asList(1.1).toArray)
Array(Map("a" -> 1.1), util.Arrays.asList(1.1).toArray),
// Simulate raw value return by SWA feature with groupBy
Array(mutable.WrappedArray.make(Array(row1, row2)), util.Arrays.asList(3, 4).toArray)
)
}
@Test(dataProvider = "dataForTestConvertRawValueTo1DFDSDenseTensorRowTz")
Expand Down

0 comments on commit a136cf8

Please sign in to comment.