diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/DataSource.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/DataSource.scala index a3f13b1f8..906ba7bd3 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/DataSource.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/DataSource.scala @@ -1,6 +1,7 @@ package com.linkedin.feathr.offline.source import com.linkedin.feathr.offline.config.location.{DataLocation, SimplePath} +import com.linkedin.feathr.offline.source.DataSource.resolveLatest import com.linkedin.feathr.offline.source.SourceFormatType.SourceFormatType import com.linkedin.feathr.offline.util.{AclCheckUtils, HdfsUtils, LocalFeatureJoinUtils} import org.apache.spark.sql.SparkSession @@ -27,36 +28,20 @@ private[offline] case class DataSource( ) extends Serializable { private lazy val ss: SparkSession = SparkSession.builder().getOrCreate() - val path: String = resolveLatest(location.getPath, None) + val path: String = if (!timePartitionPattern.isDefined) { + resolveLatest(ss, location.getPath, None) + } else { + location.getPath + } // 'postfixPath' only works for paths with timePartitionPattern val postPath: String = if(timePartitionPattern.isDefined && postfixPath.isDefined) postfixPath.get else "" val pathList: Array[String] = if (location.isInstanceOf[SimplePath] && sourceType == SourceFormatType.LIST_PATH) { - path.split(";").map(resolveLatest(_, None)) + path.split(";").map(resolveLatest(ss, _, None)) } else { Array(path) } - // resolve path with #LATEST - def resolveLatest(path: String, mockDataBaseDir: Option[String]): String = { - Try( - if (path.contains(AclCheckUtils.LATEST_PATTERN)) { - val hadoopConf = ss.sparkContext.hadoopConfiguration - if (ss.sparkContext.isLocal && LocalFeatureJoinUtils.getMockPathIfExist(path, hadoopConf, mockDataBaseDir).isDefined) { - val mockPath = LocalFeatureJoinUtils.getMockPathIfExist(path, hadoopConf, mockDataBaseDir).get - val resolvedPath = HdfsUtils.getLatestPath(mockPath, hadoopConf) - LocalFeatureJoinUtils.getOriginalFromMockPath(resolvedPath, mockDataBaseDir) - } else { - HdfsUtils.getLatestPath(path, hadoopConf) - } - } else { - path - } - ) match { - case Success(resolvedPath) => resolvedPath - case Failure(_) => path // resolved failed - } - } override def toString(): String = "path: " + path + ", sourceType:" + sourceType } @@ -76,4 +61,26 @@ object DataSource { def apply(inputLocation: DataLocation, sourceType: SourceFormatType): DataSource = DataSource(inputLocation, sourceType, None, None, None) + // resolve path with #LATEST + def resolveLatest(ss: SparkSession, path: String, mockDataBaseDir: Option[String]): String = { + Try( + if (path.contains(AclCheckUtils.LATEST_PATTERN)) { + val hadoopConf = ss.sparkContext.hadoopConfiguration + if (ss.sparkContext.isLocal && LocalFeatureJoinUtils.getMockPathIfExist(path, hadoopConf, mockDataBaseDir).isDefined) { + val mockPath = LocalFeatureJoinUtils.getMockPathIfExist(path, hadoopConf, mockDataBaseDir).get + val resolvedPath = HdfsUtils.getLatestPath(mockPath, hadoopConf) + LocalFeatureJoinUtils.getOriginalFromMockPath(resolvedPath, mockDataBaseDir) + } else { + HdfsUtils.getLatestPath(path, hadoopConf) + } + } else { + path + } + ) match { + case Success(resolvedPath) => resolvedPath + case Failure(e) => { + path // resolved failed + } + } + } } \ No newline at end of file diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/accessor/PathPartitionedTimeSeriesSourceAccessor.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/accessor/PathPartitionedTimeSeriesSourceAccessor.scala index 89d397cfa..5077cc4ff 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/accessor/PathPartitionedTimeSeriesSourceAccessor.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/accessor/PathPartitionedTimeSeriesSourceAccessor.scala @@ -146,7 +146,12 @@ private[offline] object PathPartitionedTimeSeriesSourceAccessor { val timeFormatString = pathInfo.datePathPattern val dataframes = pathList.map(path => { - val timeStr = path.substring(path.length - (timeFormatString.length + postfixPath.length), path.length - postfixPath.length) + val timeStr = if (pathInfo.basePath.contains(pathInfo.datePathPattern)) { + val idx = pathInfo.basePath.indexOf(pathInfo.datePathPattern) + path.substring(idx, idx + pathInfo.datePathPattern.length) + } else { + path.substring(path.length - (timeFormatString.length + postfixPath.length), path.length - postfixPath.length) + } val time = OfflineDateTimeUtils.createTimeFromString(timeStr, timeFormatString) val interval = DateTimeInterval.createFromInclusive(time, time, dateTimeResolution) val df = fileLoaderFactory.create(path).loadDataFrame() diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/dataloader/AvroJsonDataLoader.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/dataloader/AvroJsonDataLoader.scala index 4866dfcf7..7f19707c0 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/dataloader/AvroJsonDataLoader.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/dataloader/AvroJsonDataLoader.scala @@ -105,7 +105,8 @@ private[offline] object AvroJsonDataLoader { val sc = ss.sparkContext require(sc.isLocal) require(path.endsWith(".avro.json")) - val contents = Source.fromResource(path).getLines().mkString + val source = Source.fromResource(path) + val contents = source.getLines().mkString val jackson = new ObjectMapper(new HoconFactory) val tree = jackson.readTree(contents) val jsonDataArray = tree.get("data") diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/dataloader/BatchDataLoader.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/dataloader/BatchDataLoader.scala index bf188bd09..b9b650eba 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/dataloader/BatchDataLoader.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/dataloader/BatchDataLoader.scala @@ -99,18 +99,23 @@ private[offline] class BatchDataLoader(ss: SparkSession, location: DataLocation, } catch { case _: Throwable => try { - ss.read.format("csv").option("header", "true").option("delimiter", csvDelimiterOption).load(dataPath) + new AvroJsonDataLoader(ss, dataPath + "/data.avro.json").loadDataFrame() } catch { case e: Exception => - // If data loading from source failed, retry it automatically, as it might due to data source still being written into. - log.info(s"Loading ${location} failed, retrying for ${retry}-th time..") - if (retry > 0) { - Thread.sleep(retryWaitTime) - loadDataFrameWithRetry(dataIOParameters, jobConf, retry - 1) - } else { - // Throwing exception to avoid dataLoaderHandler hook exception from being suppressed. - throw new FeathrInputDataException(ErrorLabel.FEATHR_USER_ERROR, s"Failed to load ${dataPath} after ${initialNumOfRetries} retries" + - s" and retry time of ${retryWaitTime}ms. Error message: ${e.getMessage}") + try { + ss.read.format("csv").option("header", "true").option("delimiter", csvDelimiterOption).load(dataPath) + } catch { + case e: Exception => + // If data loading from source failed, retry it automatically, as it might due to data source still being written into. + log.info(s"Loading ${location} failed, retrying for ${retry}-th time..") + if (retry > 0) { + Thread.sleep(retryWaitTime) + loadDataFrameWithRetry(dataIOParameters, jobConf, retry - 1) + } else { + // Throwing exception to avoid dataLoaderHandler hook exception from being suppressed. + throw new FeathrInputDataException(ErrorLabel.FEATHR_USER_ERROR, s"Failed to load ${dataPath} after ${initialNumOfRetries} retries" + + s" and retry time of ${retryWaitTime}ms. Error message: ${e.getMessage}") + } } } } diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/pathutil/TimeBasedHdfsPathGenerator.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/pathutil/TimeBasedHdfsPathGenerator.scala index d710ebc47..202ee159e 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/pathutil/TimeBasedHdfsPathGenerator.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/pathutil/TimeBasedHdfsPathGenerator.scala @@ -1,8 +1,11 @@ package com.linkedin.feathr.offline.source.pathutil +import com.linkedin.feathr.offline.source.DataSource.resolveLatest import com.linkedin.feathr.offline.util.datetime.{DateTimeInterval, OfflineDateTimeUtils} +import org.apache.spark.sql.SparkSession import java.time.format.DateTimeFormatter +import scala.util.{Failure, Success, Try} /** * Generate a list of paths based on the given time interval @@ -30,8 +33,25 @@ private[offline] class TimeBasedHdfsPathGenerator(pathChecker: PathChecker) { val chronUnit = OfflineDateTimeUtils.dateTimeResolutionToChronoUnit(dateTimeResolution) val numUnits = chronUnit.between(factDataStartTime, factDataEndTime).toInt val formatter = DateTimeFormatter.ofPattern(pathInfo.datePathPattern).withZone(OfflineDateTimeUtils.DEFAULT_ZONE_ID) + val ss = SparkSession.builder().getOrCreate() val filePaths = (0 until numUnits) - .map(offset => pathInfo.basePath + formatter.format(factDataStartTime.plus(offset, chronUnit)) + postfixPath).distinct + .map(offset => { + val time = formatter.format(factDataStartTime.plus(offset, chronUnit)) + val withTimePath = if (pathInfo.basePath.contains(pathInfo.datePathPattern)) { + pathInfo.basePath.replaceAll(pathInfo.datePathPattern, time) + } else { + pathInfo.basePath + time + postfixPath + } + Try { + resolveLatest(ss, withTimePath, None) + } match { + case Success(resolvedPath) => resolvedPath + case Failure(_) => { + withTimePath + } + } + // Resolve latest again + }).distinct if (ignoreMissingFiles) { filePaths.filter(filePath => pathChecker.exists(filePath) && pathChecker.nonEmpty(filePath)) diff --git a/feathr-impl/src/test/resources/slidingWindowAgg/localAnchorTestObsData2.avro.json b/feathr-impl/src/test/resources/slidingWindowAgg/localAnchorTestObsData2.avro.json new file mode 100644 index 000000000..0f5f209a8 --- /dev/null +++ b/feathr-impl/src/test/resources/slidingWindowAgg/localAnchorTestObsData2.avro.json @@ -0,0 +1,119 @@ +{ + "schema": { + "type": "record", + "name": "NTVObs", + "doc": "Daily or multi-day aggregated a activity features generated from similar data sources.", + "namespace": "com.linkedin.feathr.offline.data", + "fields": [ + { + "name": "x", + "type": [ + "null", + "string" + ], + "doc": "" + }, + { + "name": "y", + "type": "string", + "doc": "" + }, + { + "name": "timestamp", + "type": "string" + }, + { + "name": "passthroughFeatures", + "type": { + "type": "array", + "items": { + "type": "record", + "name": "features", + "fields": [ + { + "name": "name", + "type": "string" + }, + { + "name": "term", + "type": "string" + }, + { + "name": "value", + "type": "float" + } + ] + } + } + } + ] + }, + "data": [ + { + "x": { + "string": "1" + }, + "y": "a2", + "timestamp": "2018-05-03", + "passthroughFeatures": [ + { + "name": "f1f1", + "term": "f1t1", + "value": 12 + } + ] + }, + { + "x": { + "string": "a2" + }, + "y": "a1", + "timestamp": "2018-05-03", + "passthroughFeatures": [ + { + "name": "f1f1", + "term": "f1t1", + "value": 12 + } + ] + }, + { + "x": null, + "y": "a5", + "timestamp": "2018-05-03", + "passthroughFeatures": [ + { + "name": "f1f1", + "term": "f1t1", + "value": 12 + } + ] + }, + { + "x": null, + "y": "a8", + "timestamp": "2018-05-03", + "passthroughFeatures": [ + { + "name": "f1f1", + "term": "f1t1", + "value": 12 + } + ] + }, + { + "x": { + "string": "xyz" + }, + "y": "abc", + "timestamp": "2018-04-30", + "passthroughFeatures": [ + { + "name": "f2f2", + "term": "f2t2", + "value": 12 + } + ] + } + ] +} \ No newline at end of file diff --git a/feathr-impl/src/test/resources/slidingWindowAgg/localSWAAnchorTestFeatureData/daily/2018/05/01/postfixPath/05/part-00000-a5fbb15b-11b1-4a96-9fb0-28f7b77de928-c000.avro b/feathr-impl/src/test/resources/slidingWindowAgg/localSWAAnchorTestFeatureData/daily/2018/05/01/postfixPath/05/part-00000-a5fbb15b-11b1-4a96-9fb0-28f7b77de928-c000.avro new file mode 100644 index 000000000..2823d5087 Binary files /dev/null and b/feathr-impl/src/test/resources/slidingWindowAgg/localSWAAnchorTestFeatureData/daily/2018/05/01/postfixPath/05/part-00000-a5fbb15b-11b1-4a96-9fb0-28f7b77de928-c000.avro differ diff --git a/feathr-impl/src/test/resources/slidingWindowAgg/localSWAAnchorTestFeatureData/daily/2018/05/01/postfixPath/05/part-00001-a5fbb15b-11b1-4a96-9fb0-28f7b77de928-c000.avro b/feathr-impl/src/test/resources/slidingWindowAgg/localSWAAnchorTestFeatureData/daily/2018/05/01/postfixPath/05/part-00001-a5fbb15b-11b1-4a96-9fb0-28f7b77de928-c000.avro new file mode 100644 index 000000000..4b0549fdd Binary files /dev/null and b/feathr-impl/src/test/resources/slidingWindowAgg/localSWAAnchorTestFeatureData/daily/2018/05/01/postfixPath/05/part-00001-a5fbb15b-11b1-4a96-9fb0-28f7b77de928-c000.avro differ diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala index 7485b2af4..e6716dd64 100644 --- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala +++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/SlidingWindowAggIntegTest.scala @@ -255,6 +255,69 @@ class SlidingWindowAggIntegTest extends FeathrIntegTest { assertEquals(row1f1f1, TestUtils.build1dSparseTensorFDSRow(Array("f1t1"), Array(12.0f))) } + + /** + * test SWA with lateralview parameters + */ + @Test + def testLocalAnchorSWAWithLatest: Unit = { + val df = runLocalFeatureJoinForTest( + joinConfigAsString = """ + | settings: { + | observationDataTimeSettings: { + | absoluteTimeRange: { + | startTime: "2018-05-01" + | endTime: "2018-05-03" + | timeFormat: "yyyy-MM-dd" + | } + | } + | joinTimeSettings: { + | timestampColumn: { + | def: timestamp + | format: "yyyy-MM-dd" + | } + | } + |} + | + |features: [ + | { + | key: [x], + | featureList: ["f1"] + | } + |] + """.stripMargin, + featureDefAsString = """ + |sources: { + | swaSource: { + | location: { path: "src/test/resources/slidingWindowAgg/localSWAAnchorTestFeatureData/daily/yyyy/MM/dd/postfixPath/#LATEST" } + | timePartitionPattern: "yyyy/MM/dd" + | } + |} + | + |anchors: { + | swaAnchor: { + | source: "swaSource" + | key: "id" + | features: { + | f1: { + | def: "int_value" + | window: 3d + | aggregation: LATEST + | } + | } + | } + |} + """.stripMargin, + "slidingWindowAgg/localAnchorTestObsData2.avro.json").data + df.show() + + // validate output in name term value format + val featureList = df.collect().sortBy(row => if (row.get(0) != null) row.getAs[String]("x") else "null") + val row0 = featureList(0) + val row0f1 = row0.getAs[Float]("f1") + assertEquals(row0f1, 1.0f) + } + /** * test SWA with lateralview parameters and ADD_DEFAULT_COL_FOR_MISSING_DATA flag set */ diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/source/pathutil/TestTimeBasedHdfsPathGenerator.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/source/pathutil/TestTimeBasedHdfsPathGenerator.scala index 2576a6b6a..1077ed08e 100644 --- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/source/pathutil/TestTimeBasedHdfsPathGenerator.scala +++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/source/pathutil/TestTimeBasedHdfsPathGenerator.scala @@ -24,6 +24,18 @@ class TestTimeBasedHdfsPathGenerator extends TestFeathr with MockitoSugar{ verifyNoMoreInteractions(mockPathChecker) } + @Test(description = "test generate daily file list with latest") + def testGenerateDailyFilesWithLatest() : Unit = { + val mockPathChecker = mock[PathChecker] + val pathGenerator = new TimeBasedHdfsPathGenerator(mockPathChecker) + + val pathInfo = PathInfo("src/test/resources/generation/#LATEST/yyyy/MM/dd", DateTimeResolution.DAILY, "yyyy/MM/dd") + val interval = TestUtils.createDailyInterval("2019-05-18", "2019-05-20") + val pathList = pathGenerator.generate(pathInfo, interval, false) + assertEquals(pathList.toList, List("src/test/resources/generation/hourly/2019/05/18", "src/test/resources/generation/hourly/2019/05/19")) + verifyNoMoreInteractions(mockPathChecker) + } + @Test(description = "test generate hourly file list") def testGenerateHourlyFiles() : Unit = { val mockPathChecker = mock[PathChecker]