diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/dataloader/BatchDataLoader.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/dataloader/BatchDataLoader.scala index b9b650eba..98f01f6f8 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/dataloader/BatchDataLoader.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/dataloader/BatchDataLoader.scala @@ -97,7 +97,8 @@ private[offline] class BatchDataLoader(ss: SparkSession, location: DataLocation, } df } catch { - case _: Throwable => + case e: Throwable => + e.printStackTrace() // This prints the stack trace of the Throwable try { new AvroJsonDataLoader(ss, dataPath + "/data.avro.json").loadDataFrame() } catch { @@ -106,6 +107,7 @@ private[offline] class BatchDataLoader(ss: SparkSession, location: DataLocation, ss.read.format("csv").option("header", "true").option("delimiter", csvDelimiterOption).load(dataPath) } catch { case e: Exception => + e.printStackTrace() // If data loading from source failed, retry it automatically, as it might due to data source still being written into. log.info(s"Loading ${location} failed, retrying for ${retry}-th time..") if (retry > 0) { diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/dataloader/hdfs/FileFormat.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/dataloader/hdfs/FileFormat.scala index b405b0728..5b79b0b35 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/dataloader/hdfs/FileFormat.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/source/dataloader/hdfs/FileFormat.scala @@ -26,6 +26,8 @@ object FileFormat { // Snowflake type val SNOWFLAKE = "SNOWFLAKE" + val UNITY_CATALOG = "UNITY_CATALOG" + private val AVRO_DATASOURCE = "avro" // Use Spark native orc reader instead of hive-orc since Spark 2.3 private val ORC_DATASOURCE = "orc" @@ -47,6 +49,7 @@ object FileFormat { case p if p.endsWith(".avro") => AVRO case p if p.startsWith("jdbc:") => JDBC case p if p.startsWith("snowflake:") => SNOWFLAKE + case p if p.startsWith("unity:") => UNITY_CATALOG case _ => // if we cannot tell the file format from the file extensions, we should read from `spark.feathr.inputFormat` to get the format that's sepcified by user. if (ss.conf.get("spark.feathr.inputFormat","").nonEmpty) ss.conf.get("spark.feathr.inputFormat") else PATHLIST @@ -85,6 +88,7 @@ object FileFormat { case p if p.endsWith(".avro") => AVRO case p if p.startsWith("jdbc:") => JDBC case p if p.startsWith("snowflake:") => SNOWFLAKE + case p if p.startsWith("unity:") => UNITY_CATALOG case _ => // if we cannot tell the file format from the file extensions, we should read from `spark.feathr.inputFormat` to get the format that's sepcified by user. dataIOParameters.getOrElse(DATA_FORMAT, ss.conf.get("spark.feathr.inputFormat", AVRO)).toUpperCase @@ -96,7 +100,7 @@ object FileFormat { def loadHdfsDataFrame(format: String, existingHdfsPaths: Seq[String]): DataFrame = { // Get csvDelimiterOption set with spark.feathr.inputFormat.csvOptions.sep and check if it is set properly (Only for CSV and TSV) - val csvDelimiterOption = checkDelimiterOption(ss.sqlContext.getConf("spark.feathr.inputFormat.csvOptions.sep", ",")) + val csvDelimiterOption = checkDelimiterOption(ss.sqlContext.getConf("spark.feathr.inputFormat.csvOptions.sep", ",")) val df = format match { case CSV => @@ -112,6 +116,11 @@ object FileFormat { JdbcUtils.loadDataFrame(ss, existingHdfsPaths.head) case SNOWFLAKE => SnowflakeUtils.loadDataFrame(ss, existingHdfsPaths.head) + + case UNITY_CATALOG => + val pathHead = existingHdfsPaths.head + val unityCatalogTable = pathHead.replaceFirst("unity:", "") + ss.table(unityCatalogTable) case _ => // Allow dynamic config of the file format if users want to use one if (ss.conf.getOption("spark.feathr.inputFormat").nonEmpty) ss.read.format(ss.conf.get("spark.feathr.inputFormat")).load(existingHdfsPaths: _*) diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/AclCheckUtils.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/AclCheckUtils.scala index 59098db3e..ca17359ac 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/AclCheckUtils.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/util/AclCheckUtils.scala @@ -33,7 +33,7 @@ private[offline] object AclCheckUtils { // Check read authorization on a path string def checkReadAuthorization(conf: Configuration, pathName: String): Try[Unit] = { // no way to check jdbc auth yet - if (pathName.startsWith("jdbc:")) { + if (pathName.startsWith("jdbc:") || pathName.startsWith("unity:")) { Success(()) } else { val path = new Path(pathName)