Skip to content

Commit

Permalink
Support #LATEST and timePartitionPattern at the same time (#1198)
Browse files Browse the repository at this point in the history
  • Loading branch information
jaymo001 committed Jun 30, 2023
1 parent 169b86e commit bfe9a40
Show file tree
Hide file tree
Showing 10 changed files with 267 additions and 35 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.feathr.offline.source

import com.linkedin.feathr.offline.config.location.{DataLocation, SimplePath}
import com.linkedin.feathr.offline.source.DataSource.resolveLatest
import com.linkedin.feathr.offline.source.SourceFormatType.SourceFormatType
import com.linkedin.feathr.offline.util.{AclCheckUtils, HdfsUtils, LocalFeatureJoinUtils}
import org.apache.spark.sql.SparkSession
Expand All @@ -27,36 +28,20 @@ private[offline] case class DataSource(
)
extends Serializable {
private lazy val ss: SparkSession = SparkSession.builder().getOrCreate()
val path: String = resolveLatest(location.getPath, None)
val path: String = if (!timePartitionPattern.isDefined) {
resolveLatest(ss, location.getPath, None)
} else {
location.getPath
}
// 'postfixPath' only works for paths with timePartitionPattern
val postPath: String = if(timePartitionPattern.isDefined && postfixPath.isDefined) postfixPath.get else ""
val pathList: Array[String] =
if (location.isInstanceOf[SimplePath] && sourceType == SourceFormatType.LIST_PATH) {
path.split(";").map(resolveLatest(_, None))
path.split(";").map(resolveLatest(ss, _, None))
} else {
Array(path)
}

// resolve path with #LATEST
def resolveLatest(path: String, mockDataBaseDir: Option[String]): String = {
Try(
if (path.contains(AclCheckUtils.LATEST_PATTERN)) {
val hadoopConf = ss.sparkContext.hadoopConfiguration
if (ss.sparkContext.isLocal && LocalFeatureJoinUtils.getMockPathIfExist(path, hadoopConf, mockDataBaseDir).isDefined) {
val mockPath = LocalFeatureJoinUtils.getMockPathIfExist(path, hadoopConf, mockDataBaseDir).get
val resolvedPath = HdfsUtils.getLatestPath(mockPath, hadoopConf)
LocalFeatureJoinUtils.getOriginalFromMockPath(resolvedPath, mockDataBaseDir)
} else {
HdfsUtils.getLatestPath(path, hadoopConf)
}
} else {
path
}
) match {
case Success(resolvedPath) => resolvedPath
case Failure(_) => path // resolved failed
}
}

override def toString(): String = "path: " + path + ", sourceType:" + sourceType
}
Expand All @@ -76,4 +61,26 @@ object DataSource {
def apply(inputLocation: DataLocation,
sourceType: SourceFormatType): DataSource = DataSource(inputLocation, sourceType, None, None, None)

// resolve path with #LATEST
def resolveLatest(ss: SparkSession, path: String, mockDataBaseDir: Option[String]): String = {
Try(
if (path.contains(AclCheckUtils.LATEST_PATTERN)) {
val hadoopConf = ss.sparkContext.hadoopConfiguration
if (ss.sparkContext.isLocal && LocalFeatureJoinUtils.getMockPathIfExist(path, hadoopConf, mockDataBaseDir).isDefined) {
val mockPath = LocalFeatureJoinUtils.getMockPathIfExist(path, hadoopConf, mockDataBaseDir).get
val resolvedPath = HdfsUtils.getLatestPath(mockPath, hadoopConf)
LocalFeatureJoinUtils.getOriginalFromMockPath(resolvedPath, mockDataBaseDir)
} else {
HdfsUtils.getLatestPath(path, hadoopConf)
}
} else {
path
}
) match {
case Success(resolvedPath) => resolvedPath
case Failure(e) => {
path // resolved failed
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,12 @@ private[offline] object PathPartitionedTimeSeriesSourceAccessor {
val timeFormatString = pathInfo.datePathPattern

val dataframes = pathList.map(path => {
val timeStr = path.substring(path.length - (timeFormatString.length + postfixPath.length), path.length - postfixPath.length)
val timeStr = if (pathInfo.basePath.contains(pathInfo.datePathPattern)) {
val idx = pathInfo.basePath.indexOf(pathInfo.datePathPattern)
path.substring(idx, idx + pathInfo.datePathPattern.length)
} else {
path.substring(path.length - (timeFormatString.length + postfixPath.length), path.length - postfixPath.length)
}
val time = OfflineDateTimeUtils.createTimeFromString(timeStr, timeFormatString)
val interval = DateTimeInterval.createFromInclusive(time, time, dateTimeResolution)
val df = fileLoaderFactory.create(path).loadDataFrame()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ private[offline] object AvroJsonDataLoader {
val sc = ss.sparkContext
require(sc.isLocal)
require(path.endsWith(".avro.json"))
val contents = Source.fromResource(path).getLines().mkString
val source = Source.fromResource(path)
val contents = source.getLines().mkString
val jackson = new ObjectMapper(new HoconFactory)
val tree = jackson.readTree(contents)
val jsonDataArray = tree.get("data")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,18 +99,23 @@ private[offline] class BatchDataLoader(ss: SparkSession, location: DataLocation,
} catch {
case _: Throwable =>
try {
ss.read.format("csv").option("header", "true").option("delimiter", csvDelimiterOption).load(dataPath)
new AvroJsonDataLoader(ss, dataPath + "/data.avro.json").loadDataFrame()
} catch {
case e: Exception =>
// If data loading from source failed, retry it automatically, as it might due to data source still being written into.
log.info(s"Loading ${location} failed, retrying for ${retry}-th time..")
if (retry > 0) {
Thread.sleep(retryWaitTime)
loadDataFrameWithRetry(dataIOParameters, jobConf, retry - 1)
} else {
// Throwing exception to avoid dataLoaderHandler hook exception from being suppressed.
throw new FeathrInputDataException(ErrorLabel.FEATHR_USER_ERROR, s"Failed to load ${dataPath} after ${initialNumOfRetries} retries" +
s" and retry time of ${retryWaitTime}ms. Error message: ${e.getMessage}")
try {
ss.read.format("csv").option("header", "true").option("delimiter", csvDelimiterOption).load(dataPath)
} catch {
case e: Exception =>
// If data loading from source failed, retry it automatically, as it might due to data source still being written into.
log.info(s"Loading ${location} failed, retrying for ${retry}-th time..")
if (retry > 0) {
Thread.sleep(retryWaitTime)
loadDataFrameWithRetry(dataIOParameters, jobConf, retry - 1)
} else {
// Throwing exception to avoid dataLoaderHandler hook exception from being suppressed.
throw new FeathrInputDataException(ErrorLabel.FEATHR_USER_ERROR, s"Failed to load ${dataPath} after ${initialNumOfRetries} retries" +
s" and retry time of ${retryWaitTime}ms. Error message: ${e.getMessage}")
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package com.linkedin.feathr.offline.source.pathutil

import com.linkedin.feathr.offline.source.DataSource.resolveLatest
import com.linkedin.feathr.offline.util.datetime.{DateTimeInterval, OfflineDateTimeUtils}
import org.apache.spark.sql.SparkSession

import java.time.format.DateTimeFormatter
import scala.util.{Failure, Success, Try}

/**
* Generate a list of paths based on the given time interval
Expand Down Expand Up @@ -30,8 +33,25 @@ private[offline] class TimeBasedHdfsPathGenerator(pathChecker: PathChecker) {
val chronUnit = OfflineDateTimeUtils.dateTimeResolutionToChronoUnit(dateTimeResolution)
val numUnits = chronUnit.between(factDataStartTime, factDataEndTime).toInt
val formatter = DateTimeFormatter.ofPattern(pathInfo.datePathPattern).withZone(OfflineDateTimeUtils.DEFAULT_ZONE_ID)
val ss = SparkSession.builder().getOrCreate()
val filePaths = (0 until numUnits)
.map(offset => pathInfo.basePath + formatter.format(factDataStartTime.plus(offset, chronUnit)) + postfixPath).distinct
.map(offset => {
val time = formatter.format(factDataStartTime.plus(offset, chronUnit))
val withTimePath = if (pathInfo.basePath.contains(pathInfo.datePathPattern)) {
pathInfo.basePath.replaceAll(pathInfo.datePathPattern, time)
} else {
pathInfo.basePath + time + postfixPath
}
Try {
resolveLatest(ss, withTimePath, None)
} match {
case Success(resolvedPath) => resolvedPath
case Failure(_) => {
withTimePath
}
}
// Resolve latest again
}).distinct

if (ignoreMissingFiles) {
filePaths.filter(filePath => pathChecker.exists(filePath) && pathChecker.nonEmpty(filePath))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
{
"schema": {
"type": "record",
"name": "NTVObs",
"doc": "Daily or multi-day aggregated a activity features generated from similar data sources.",
"namespace": "com.linkedin.feathr.offline.data",
"fields": [
{
"name": "x",
"type": [
"null",
"string"
],
"doc": ""
},
{
"name": "y",
"type": "string",
"doc": ""
},
{
"name": "timestamp",
"type": "string"
},
{
"name": "passthroughFeatures",
"type": {
"type": "array",
"items": {
"type": "record",
"name": "features",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "term",
"type": "string"
},
{
"name": "value",
"type": "float"
}
]
}
}
}
]
},
"data": [
{
"x": {
"string": "1"
},
"y": "a2",
"timestamp": "2018-05-03",
"passthroughFeatures": [
{
"name": "f1f1",
"term": "f1t1",
"value": 12
}
]
},
{
"x": {
"string": "a2"
},
"y": "a1",
"timestamp": "2018-05-03",
"passthroughFeatures": [
{
"name": "f1f1",
"term": "f1t1",
"value": 12
}
]
},
{
"x": null,
"y": "a5",
"timestamp": "2018-05-03",
"passthroughFeatures": [
{
"name": "f1f1",
"term": "f1t1",
"value": 12
}
]
},
{
"x": null,
"y": "a8",
"timestamp": "2018-05-03",
"passthroughFeatures": [
{
"name": "f1f1",
"term": "f1t1",
"value": 12
}
]
},
{
"x": {
"string": "xyz"
},
"y": "abc",
"timestamp": "2018-04-30",
"passthroughFeatures": [
{
"name": "f2f2",
"term": "f2t2",
"value": 12
}
]
}
]
}
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,69 @@ class SlidingWindowAggIntegTest extends FeathrIntegTest {
assertEquals(row1f1f1, TestUtils.build1dSparseTensorFDSRow(Array("f1t1"), Array(12.0f)))
}


/**
* test SWA with lateralview parameters
*/
@Test
def testLocalAnchorSWAWithLatest: Unit = {
val df = runLocalFeatureJoinForTest(
joinConfigAsString = """
| settings: {
| observationDataTimeSettings: {
| absoluteTimeRange: {
| startTime: "2018-05-01"
| endTime: "2018-05-03"
| timeFormat: "yyyy-MM-dd"
| }
| }
| joinTimeSettings: {
| timestampColumn: {
| def: timestamp
| format: "yyyy-MM-dd"
| }
| }
|}
|
|features: [
| {
| key: [x],
| featureList: ["f1"]
| }
|]
""".stripMargin,
featureDefAsString = """
|sources: {
| swaSource: {
| location: { path: "src/test/resources/slidingWindowAgg/localSWAAnchorTestFeatureData/daily/yyyy/MM/dd/postfixPath/#LATEST" }
| timePartitionPattern: "yyyy/MM/dd"
| }
|}
|
|anchors: {
| swaAnchor: {
| source: "swaSource"
| key: "id"
| features: {
| f1: {
| def: "int_value"
| window: 3d
| aggregation: LATEST
| }
| }
| }
|}
""".stripMargin,
"slidingWindowAgg/localAnchorTestObsData2.avro.json").data
df.show()

// validate output in name term value format
val featureList = df.collect().sortBy(row => if (row.get(0) != null) row.getAs[String]("x") else "null")
val row0 = featureList(0)
val row0f1 = row0.getAs[Float]("f1")
assertEquals(row0f1, 1.0f)
}

/**
* test SWA with lateralview parameters and ADD_DEFAULT_COL_FOR_MISSING_DATA flag set
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,18 @@ class TestTimeBasedHdfsPathGenerator extends TestFeathr with MockitoSugar{
verifyNoMoreInteractions(mockPathChecker)
}

@Test(description = "test generate daily file list with latest")
def testGenerateDailyFilesWithLatest() : Unit = {
val mockPathChecker = mock[PathChecker]
val pathGenerator = new TimeBasedHdfsPathGenerator(mockPathChecker)

val pathInfo = PathInfo("src/test/resources/generation/#LATEST/yyyy/MM/dd", DateTimeResolution.DAILY, "yyyy/MM/dd")
val interval = TestUtils.createDailyInterval("2019-05-18", "2019-05-20")
val pathList = pathGenerator.generate(pathInfo, interval, false)
assertEquals(pathList.toList, List("src/test/resources/generation/hourly/2019/05/18", "src/test/resources/generation/hourly/2019/05/19"))
verifyNoMoreInteractions(mockPathChecker)
}

@Test(description = "test generate hourly file list")
def testGenerateHourlyFiles() : Unit = {
val mockPathChecker = mock[PathChecker]
Expand Down

0 comments on commit bfe9a40

Please sign in to comment.