Skip to content

Commit

Permalink
[SPARK-45912][SQL] Enhancement of XSDToSchema API: Change to HDFS API…
Browse files Browse the repository at this point in the history
… for cloud storage accessibility

### What changes were proposed in this pull request?

Previously, it utilized `java.nio.path`, which limited file reading to local file systems only. By changing this to an HDFS-compatible API, we now enable the XSDToSchema function to access files in cloud storage.

### Why are the changes needed?

We want to enable the XSDToSchema function to access files in cloud storage.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit tests

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #43789 from shujingyang-db/xsd_api.

Authored-by: Shujing Yang <shujing.yang@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
shujingyang-db authored and HyukjinKwon committed Nov 17, 2023
1 parent f6b670a commit fcf340a
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.spark.sql.catalyst.xml

import java.io.{File, FileInputStream, InputStream}
import javax.xml.XMLConstants
import javax.xml.transform.stream.StreamSource
import javax.xml.validation.{Schema, SchemaFactory}
Expand All @@ -25,28 +26,18 @@ import org.apache.hadoop.fs.Path

import org.apache.spark.SparkFiles
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.Utils
import org.apache.spark.internal.Logging

/**
* Utilities for working with XSD validation.
*/
private[sql] object ValidatorUtil {
private[sql] object ValidatorUtil extends Logging{
// Parsing XSDs may be slow, so cache them by path:

private val cache = CacheBuilder.newBuilder().softValues().build(
new CacheLoader[String, Schema] {
override def load(key: String): Schema = {
val in = try {
// Handle case where file exists as specified
val fs = Utils.getHadoopFileSystem(key, SparkHadoopUtil.get.conf)
fs.open(new Path(key))
} catch {
case _: Throwable =>
// Handle case where it was added with sc.addFile
val addFileUrl = SparkFiles.get(key)
val fs = Utils.getHadoopFileSystem(addFileUrl, SparkHadoopUtil.get.conf)
fs.open(new Path(addFileUrl))
}
val in = openSchemaFile(new Path(key))
try {
val schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI)
schemaFactory.newSchema(new StreamSource(in))
Expand All @@ -56,6 +47,25 @@ private[sql] object ValidatorUtil {
}
})

def openSchemaFile(xsdPath: Path): InputStream = {
try {
// Handle case where file exists as specified
val fs = xsdPath.getFileSystem(SparkHadoopUtil.get.conf)
fs.open(xsdPath)
} catch {
case e: Throwable =>
// Handle case where it was added with sc.addFile
// When they are added via sc.addFile, they are always downloaded to local file system
logInfo(s"$xsdPath was not found, falling back to look up files added by Spark")
val f = new File(SparkFiles.get(xsdPath.toString))
if (f.exists()) {
new FileInputStream(f)
} else {
throw e
}
}
}

/**
* Parses the XSD at the given local path and caches it.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,53 +16,42 @@
*/
package org.apache.spark.sql.execution.datasources.xml

import java.io.{File, FileInputStream, InputStreamReader, StringReader}
import java.nio.charset.StandardCharsets
import java.nio.file.Path
import java.io.StringReader

import scala.jdk.CollectionConverters._

import org.apache.hadoop.fs.Path
import org.apache.hadoop.shaded.org.jline.utils.InputStreamReader
import org.apache.ws.commons.schema._
import org.apache.ws.commons.schema.constants.Constants

import org.apache.spark.sql.catalyst.xml.XmlOptions
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.xml.{ValidatorUtil, XmlOptions}
import org.apache.spark.sql.types._

/**
* Utility to generate a Spark schema from an XSD. Not all XSD schemas are simple tabular schemas,
* so not all elements or XSDs are supported.
*/
object XSDToSchema {
object XSDToSchema extends Logging{

/**
* Reads a schema from an XSD file.
* Reads a schema from an XSD path.
* Note that if the schema consists of one complex parent type which you want to use as
* the row tag schema, then you will need to extract the schema of the single resulting
* struct in the resulting StructType, and use its StructType as your schema.
*
* @param xsdFile XSD file
* @param xsdPath XSD path
* @return Spark-compatible schema
*/
def read(xsdFile: File): StructType = {
def read(xsdPath: Path): StructType = {
val in = ValidatorUtil.openSchemaFile(xsdPath)
val xmlSchemaCollection = new XmlSchemaCollection()
xmlSchemaCollection.setBaseUri(xsdFile.getParent)
val xmlSchema = xmlSchemaCollection.read(
new InputStreamReader(new FileInputStream(xsdFile), StandardCharsets.UTF_8))

xmlSchemaCollection.setBaseUri(xsdPath.getParent.toString)
val xmlSchema = xmlSchemaCollection.read(new InputStreamReader(in))
getStructType(xmlSchema)
}

/**
* Reads a schema from an XSD file.
* Note that if the schema consists of one complex parent type which you want to use as
* the row tag schema, then you will need to extract the schema of the single resulting
* struct in the resulting StructType, and use its StructType as your schema.
*
* @param xsdFile XSD file
* @return Spark-compatible schema
*/
def read(xsdFile: Path): StructType = read(xsdFile.toFile)

/**
* Reads a schema from an XSD as a string.
* Note that if the schema consists of one complex parent type which you want to use as
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
*/
package org.apache.spark.sql.execution.datasources.xml.util

import java.nio.file.Paths
import java.io.FileNotFoundException

import org.apache.hadoop.fs.Path

import org.apache.spark.sql.execution.datasources.xml.TestUtils._
import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
Expand All @@ -28,8 +30,7 @@ class XSDToSchemaSuite extends SharedSparkSession {
private val resDir = "test-data/xml-resources/"

test("Basic parsing") {
val parsedSchema = XSDToSchema.read(Paths.get(testFile(resDir + "basket.xsd")
.replace("file:/", "/")))
val parsedSchema = XSDToSchema.read(new Path(testFile(resDir + "basket.xsd")))
val expectedSchema = buildSchema(
field("basket",
structField(
Expand All @@ -40,8 +41,7 @@ class XSDToSchemaSuite extends SharedSparkSession {
}

test("Relative path parsing") {
val parsedSchema = XSDToSchema.read(Paths.get(testFile(resDir + "include-example/first.xsd")
.replace("file:/", "/")))
val parsedSchema = XSDToSchema.read(new Path(testFile(resDir + "include-example/first.xsd")))
val expectedSchema = buildSchema(
field("basket",
structField(
Expand All @@ -52,8 +52,7 @@ class XSDToSchemaSuite extends SharedSparkSession {
}

test("Test schema types and attributes") {
val parsedSchema = XSDToSchema.read(Paths.get(testFile(resDir + "catalog.xsd")
.replace("file:/", "/")))
val parsedSchema = XSDToSchema.read(new Path(testFile(resDir + "catalog.xsd")))
val expectedSchema = buildSchema(
field("catalog",
structField(
Expand All @@ -76,23 +75,20 @@ class XSDToSchemaSuite extends SharedSparkSession {
}

test("Test xs:choice nullability") {
val parsedSchema = XSDToSchema.read(Paths.get(testFile(resDir + "choice.xsd")
.replace("file:/", "/")))
val parsedSchema = XSDToSchema.read(new Path(testFile(resDir + "choice.xsd")))
val expectedSchema = buildSchema(
field("el", structField(field("foo"), field("bar"), field("baz")), nullable = false))
assert(expectedSchema === parsedSchema)
}

test("Two root elements") {
val parsedSchema = XSDToSchema.read(Paths.get(testFile(resDir + "twoelements.xsd")
.replace("file:/", "/")))
val parsedSchema = XSDToSchema.read(new Path(testFile(resDir + "twoelements.xsd")))
val expectedSchema = buildSchema(field("bar", nullable = false), field("foo", nullable = false))
assert(expectedSchema === parsedSchema)
}

test("xs:any schema") {
val parsedSchema = XSDToSchema.read(Paths.get(testFile(resDir + "xsany.xsd")
.replace("file:/", "/")))
val parsedSchema = XSDToSchema.read(new Path(testFile(resDir + "xsany.xsd")))
val expectedSchema = buildSchema(
field("root",
structField(
Expand All @@ -117,17 +113,15 @@ class XSDToSchemaSuite extends SharedSparkSession {
}

test("Tests xs:long type / Issue 520") {
val parsedSchema = XSDToSchema.read(Paths.get(testFile(resDir + "long.xsd")
.replace("file:/", "/")))
val parsedSchema = XSDToSchema.read(new Path(testFile(resDir + "long.xsd")))
val expectedSchema = buildSchema(
field("test",
structField(field("userId", LongType, nullable = false)), nullable = false))
assert(parsedSchema === expectedSchema)
}

test("Test xs:decimal type with restriction[fractionalDigits]") {
val parsedSchema = XSDToSchema.read(Paths.get(testFile(resDir +
"decimal-with-restriction.xsd").replace("file:/", "/")))
val parsedSchema = XSDToSchema.read(new Path(testFile(resDir + "decimal-with-restriction.xsd")))
val expectedSchema = buildSchema(
field("decimal_type_3", DecimalType(12, 6), nullable = false),
field("decimal_type_1", DecimalType(38, 18), nullable = false),
Expand All @@ -137,8 +131,7 @@ class XSDToSchemaSuite extends SharedSparkSession {
}

test("Test ref attribute / Issue 617") {
val parsedSchema = XSDToSchema.read(Paths.get(testFile(resDir + "ref-attribute.xsd")
.replace("file:/", "/")))
val parsedSchema = XSDToSchema.read(new Path(testFile(resDir + "ref-attribute.xsd")))
val expectedSchema = buildSchema(
field(
"book",
Expand Down Expand Up @@ -166,8 +159,8 @@ class XSDToSchemaSuite extends SharedSparkSession {
}

test("Test complex content with extension element / Issue 554") {
val parsedSchema = XSDToSchema.read(Paths.get(testFile(resDir +
"complex-content-extension.xsd").replace("file:/", "/")))
val parsedSchema =
XSDToSchema.read(new Path(testFile(resDir + "complex-content-extension.xsd")))

val expectedSchema = buildSchema(
field(
Expand All @@ -184,4 +177,10 @@ class XSDToSchemaSuite extends SharedSparkSession {
)
assert(parsedSchema === expectedSchema)
}

test("SPARK-45912: Test XSDToSchema when open not found files") {
intercept[FileNotFoundException] {
XSDToSchema.read(new Path("/path/not/found"))
}
}
}

0 comments on commit fcf340a

Please sign in to comment.