Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DataPathHandler Bug Fix #531

Merged
merged 2 commits into from
Aug 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ private[offline] object DataSourceAccessor {
failOnMissingPartition: Boolean,
addTimestampColumn: Boolean,
dataLoaderHandlers: List[DataLoaderHandler]): DataSourceAccessor = {
val pathChecker = PathChecker(ss)
val pathChecker = PathChecker(ss, dataLoaderHandlers)
val fileLoaderFactory = DataLoaderFactory(ss = ss, dataLoaderHandlers = dataLoaderHandlers)
val partitionLimiter = new PartitionLimiter(ss)
val pathAnalyzer = new TimeBasedHdfsPathAnalyzer(pathChecker, dataLoaderHandlers)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package com.linkedin.feathr.offline.source.pathutil

import com.linkedin.feathr.offline.util.{HdfsUtils, LocalFeatureJoinUtils, SourceUtils}
import com.linkedin.feathr.offline.source.dataloader.DataLoaderHandler
import org.apache.hadoop.conf.Configuration

/**
* path checker for local test files.
* @param hadoopConf hadoop configuration
*/
private[offline] class LocalPathChecker(hadoopConf: Configuration) extends PathChecker {
private[offline] class LocalPathChecker(hadoopConf: Configuration, dataLoaderHandlers: List[DataLoaderHandler]) extends PathChecker {

private val TEST_AVRO_JSON_FILE = "/data.avro.json"

Expand All @@ -20,13 +21,33 @@ private[offline] class LocalPathChecker(hadoopConf: Configuration) extends PathC
LocalFeatureJoinUtils.getMockPathIfExist(path, hadoopConf, None).isDefined
}

/**
* check whether the input path is an external data source. Needs to have separate function, as there are class conflicts with Breaks.getClass
* @param path input path.
* @return true if the path is an external data source.
*/
def isExternalDataSource(path: String): Boolean = {
import scala.util.control.Breaks._

var isExternalDataSourceFlag: Boolean = false
breakable {
for(dataLoaderHandler <- dataLoaderHandlers) {
if (dataLoaderHandler.validatePath(path)) {
isExternalDataSourceFlag = true
break
}
}
}
isExternalDataSourceFlag
}

/**
* check whether the input path exists. It will try different formats for local test.
* @param path input path.
* @return true if the path exists.
*/
override def exists(path: String): Boolean = {
if (HdfsUtils.exists(path)) return true
if (!isExternalDataSource(path) && HdfsUtils.exists(path)) return true
if (LocalFeatureJoinUtils.getMockPathIfExist(path, hadoopConf, None).isDefined) return true
if (getClass.getClassLoader.getResource(path) != null) return true
if (getClass.getClassLoader.getResource(path + TEST_AVRO_JSON_FILE) != null) return true
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.feathr.offline.source.pathutil

import org.apache.spark.sql.SparkSession
import com.linkedin.feathr.offline.source.dataloader.DataLoaderHandler

/**
* Common path utility functions
Expand All @@ -25,8 +26,8 @@ private[offline] trait PathChecker {
* It will construct a specific path checker according to the spark session.
*/
private[offline] object PathChecker {
def apply(ss : SparkSession): PathChecker = {
if (ss.sparkContext.isLocal) new LocalPathChecker(ss.sparkContext.hadoopConfiguration)
def apply(ss : SparkSession, dataLoaderHandlers: List[DataLoaderHandler]): PathChecker = {
if (ss.sparkContext.isLocal) new LocalPathChecker(ss.sparkContext.hadoopConfiguration, dataLoaderHandlers)
else new HdfsPathChecker()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private[offline] class AnchorToDataSourceMapper(dataPathHandlers: List[DataPathH

// Only file-based source has real "path", others are just single dataset
val adjustedObsTimeRange = if (factDataSource.location.isFileBasedLocation()) {
val pathChecker = PathChecker(ss)
val pathChecker = PathChecker(ss, dataLoaderHandlers)
val pathAnalyzer = new TimeBasedHdfsPathAnalyzer(pathChecker, dataLoaderHandlers)
val pathInfo = pathAnalyzer.analyze(factDataSource.path)
if (pathInfo.dateTimeResolution == DateTimeResolution.DAILY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ private[offline] object SourceUtils {
sourceFormatType match {
case SourceFormatType.FIXED_PATH => Seq(HdfsUtils.getLatestPath(sourcePath, ss.sparkContext.hadoopConfiguration))
case SourceFormatType.TIME_PATH =>
val pathChecker = PathChecker(ss)
val pathChecker = PathChecker(ss, dataLoaderHandlers)
val pathGenerator = new TimeBasedHdfsPathGenerator(pathChecker)
val pathAnalyzer = new TimeBasedHdfsPathAnalyzer(pathChecker, dataLoaderHandlers)
val pathInfo = pathAnalyzer.analyze(sourcePath)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package com.linkedin.feathr.offline.source.pathutil

import com.linkedin.feathr.offline.TestFeathr
import com.linkedin.feathr.offline.source.dataloader.DataLoaderHandler

import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.DataFrame
import org.apache.hadoop.mapred.JobConf
import org.mockito.Mockito.when
import org.scalatest.mockito.MockitoSugar
import org.testng.Assert.{assertEquals, assertTrue}
Expand All @@ -14,13 +18,13 @@ class TestPathChecker extends TestFeathr with MockitoSugar {

@Test(description = "test creation of PathChecker")
def testCreateDataSourcePathChecker(): Unit = {
val localPathChecker = PathChecker(ss)
val localPathChecker = PathChecker(ss, List())
assertTrue(localPathChecker.isInstanceOf[LocalPathChecker])
val mockSparkSession = mock[SparkSession]
val mockSparkContext = mock[SparkContext]
when(mockSparkSession.sparkContext).thenReturn(mockSparkContext)
when(mockSparkContext.isLocal).thenReturn(false)
val defaultPathChecker = PathChecker(mockSparkSession)
val defaultPathChecker = PathChecker(mockSparkSession, List())
assertTrue(defaultPathChecker.isInstanceOf[HdfsPathChecker])
}

Expand All @@ -35,10 +39,36 @@ class TestPathChecker extends TestFeathr with MockitoSugar {

@Test(description = "test exists method for LocalPathChecker")
def testLocalPathCheckerExists() : Unit = {
val localPathChecker = new LocalPathChecker(new Configuration())
val localPathChecker = new LocalPathChecker(new Configuration(), List())
assertEquals(localPathChecker.exists("src/test/resources/anchor1-source.csv"), true)
assertEquals(localPathChecker.exists("anchor1-source.csv"), true)
assertEquals(localPathChecker.exists("generation/daily/2019/05/19"), true)
assertEquals(localPathChecker.exists("non-existing_path"), false)
}

@Test(description = "test isExternalDataSource method for LocalPathChecker")
def testLocalPathCheckerIsExternalDataSource() : Unit = {
val mockCreateDataFrame = (path: String, daliParameters: Map[String, String], jobConf: JobConf) => mock[DataFrame]

val mockUnionDataFrame = (
paths: Seq[String],
parameters: Map[String, String],
jobConf: JobConf) => mock[DataFrame]

val mockWriteDataFrame = (
df: DataFrame,
path: String,
parameters: Map[String, String]) => {}

val mockDataLoaderHandler = DataLoaderHandler(
validatePath = (path: String) => path == "xyz://",
createDataFrame = mockCreateDataFrame,
createUnionDataFrame = mockUnionDataFrame,
writeDataFrame = mockWriteDataFrame,
)
val mockDataLoaderHandlers = List(mockDataLoaderHandler)
val localPathChecker = new LocalPathChecker(new Configuration(), mockDataLoaderHandlers)
assertTrue(localPathChecker.isExternalDataSource("xyz://"))
assertTrue(!localPathChecker.isExternalDataSource("file://"))
}
}