Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DOP-13846] - implement XML.parse_column #269

Merged
merged 7 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/changelog/next_release/269.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add ``XML.parse_column`` method for handling XML data within Spark. This method allows for direct parsing of XML strings into structured Spark DataFrame columns.
53 changes: 53 additions & 0 deletions docs/connection/db_connection/kafka/format_handling.rst
Original file line number Diff line number Diff line change
Expand Up @@ -242,3 +242,56 @@ To serialize structured data into Avro format and write it back to a Kafka topic
# | 1|[02 02 02 08 76 6... (binary data)] |
# | 2|[02 04 02 08 76 6... (binary data)] |
# +---+------------------------------------+

XML Format Handling
-------------------

Handling XML data in Kafka involves parsing string representations of XML into structured Spark DataFrame format.

``DBReader``
~~~~~~~~~~~~

To process XML formatted data from Kafka, use the :obj:`XML.parse_column <onetl.file.format.xml.XML.parse_column>` method. This method allows you to convert a column containing XML strings directly into a structured Spark DataFrame using a specified schema.

.. code-block:: python

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

from onetl.db import DBReader
from onetl.file.format import XML
from onetl.connection import Kafka

spark = SparkSession.builder.appName("KafkaXMLExample").getOrCreate()

kafka = Kafka(...)
xml = XML(row_tag="person")

reader = DBReader(
connection=kafka,
topic="topic_name",
)
df = reader.run()

df.show()
# +----+--------------------------------------------------------------------------------------------+----------+---------+------+-----------------------+-------------+
# |key |value |topic |partition|offset|timestamp |timestampType|
# +----+-------------------------------------------------+----------+---------+------+-----------------------+-----------+------+-----------------------+-------------+
# |[31]|"<?xml version=\"1.0\" encoding=\"UTF-8\"?><person><name>Alice</name><age>20</age></person>"|topicXML |0 |0 |2024-04-24 13:02:25.911|0 |
# |[32]|"<?xml version=\"1.0\" encoding=\"UTF-8\"?><person><name>Bob</name><age>25</age></person>" |topicXML |0 |1 |2024-04-24 13:02:25.922|0 |
# +----+--------------------------------------------------------------------------------------------+----------+---------+------+-----------------------+-------------+

xml_schema = StructType(
[
StructField("name", StringType(), nullable=True),
StructField("age", IntegerType(), nullable=True),
]
)
parsed_xml_df = df.select(xml.parse_column("value", xml_schema))
parsed_xml_df.show()
# +-----------+
# |value |
# +-----------+
# |{Alice, 20}|
# |{Bob, 25} |
# +-----------+
2 changes: 1 addition & 1 deletion docs/file_df/file_formats/xml.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ XML
.. currentmodule:: onetl.file.format.xml

.. autoclass:: XML
:members: get_packages
:members: get_packages, parse_column
97 changes: 96 additions & 1 deletion onetl/file/format/xml.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
from onetl.hooks import slot, support_hooks

if TYPE_CHECKING:
from pyspark.sql import SparkSession
from pyspark.sql import Column, SparkSession
from pyspark.sql.types import StructType


PROHIBITED_OPTIONS = frozenset(
Expand Down Expand Up @@ -226,3 +227,97 @@ def check_if_supported(self, spark: SparkSession) -> None:
if log.isEnabledFor(logging.DEBUG):
log.debug("Missing Java class", exc_info=e, stack_info=True)
raise ValueError(msg) from e

def parse_column(self, column: str | Column, schema: StructType) -> Column:
"""
Parses an XML string column into a structured Spark SQL column using the ``from_xml`` function
provided by the `Databricks Spark XML library <https://github.com/databricks/spark-xml#pyspark-notes>`_
based on the provided schema.

.. note::

This method assumes that the ``spark-xml`` package is installed and properly configured within your Spark environment.
maxim-lixakov marked this conversation as resolved.
Show resolved Hide resolved

.. note::

This method does not support XML strings with a root tag that is not specified as the ``rowTag``. If your XML data includes a root tag that encapsulates multiple row tags, ensure to preprocess the XML string to remove or ignore the root tag before parsing.
maxim-lixakov marked this conversation as resolved.
Show resolved Hide resolved

.. code-block:: xml

<books>
<book><title>Book One</title><author>Author A</author></book>
<book><title>Book Two</title><author>Author B</author></book>
</books>

# before applying method xml field should be processed into:

<book><title>Book One</title><author>Author A</author></book>
<book><title>Book Two</title><author>Author B</author></book>



Parameters
----------
column : str | Column
The name of the column or the Column object containing XML strings to parse.

Returns
-------
Column
A new Column object with data parsed from XML string to the specified structured format.

Examples
--------
.. code-block:: python

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

from onetl.file.format import XML

spark = SparkSession.builder.appName("XMLParsingExample").getOrCreate()
schema = StructType(
[
StructField("author", StringType(), nullable=True),
StructField("title", StringType(), nullable=True),
StructField("genre", StringType(), nullable=True),
StructField("price", IntegerType(), nullable=True),
]
)
xml_processor = XML(row_tag="book")

data = [
(
"<book><author>Austen, Jane</author><title>Pride and Prejudice</title><genre>romance</genre><price>19</price></book>",
)
]
df = spark.createDataFrame(data, ["xml_string"])

parsed_df = df.select(xml_processor.parse_column("xml_string", schema=schema))
parsed_df.show()

"""
from pyspark.sql import Column, SparkSession # noqa: WPS442

spark = SparkSession._instantiatedSession # noqa: WPS437
self.check_if_supported(spark)

from pyspark.sql.column import _to_java_column # noqa: WPS450
from pyspark.sql.functions import col

if isinstance(column, Column):
column_name, column = column._jc.toString(), column.cast("string") # noqa: WPS437
else:
column_name, column = column, col(column).cast("string")

java_column = _to_java_column(column)
java_schema = spark._jsparkSession.parseDataType(schema.json()) # noqa: WPS437
scala_options = spark._jvm.org.apache.spark.api.python.PythonUtils.toScalaMap( # noqa: WPS219, WPS437
self.dict(),
)
jc = spark._jvm.com.databricks.spark.xml.functions.from_xml( # noqa: WPS219, WPS437
java_column,
java_schema,
scala_options,
)
return Column(jc).alias(column_name)
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@
from onetl.file.format import XML

try:
from tests.util.assert_df import assert_equal_df
from pyspark.sql.functions import col

from tests.util.assert_df import assert_equal_df, assert_subset_df
except ImportError:
pytest.skip("Missing pandas", allow_module_level=True)
pytest.skip("Missing pandas or pyspark", allow_module_level=True)

pytestmark = [pytest.mark.local_fs, pytest.mark.file_df_connection, pytest.mark.connection, pytest.mark.xml]

Expand Down Expand Up @@ -166,3 +168,51 @@ def test_xml_reader_with_attributes(
assert read_df.count()
assert read_df.schema == expected_xml_attributes_df.schema
assert_equal_df(read_df, expected_xml_attributes_df, order_by="id")


@pytest.mark.parametrize("column_type", [str, col])
def test_xml_parse_column(
spark,
local_fs_file_df_connection_with_path_and_files,
expected_xml_attributes_df,
file_df_dataframe,
file_df_schema,
column_type,
):
from pyspark.sql.functions import expr

from onetl.file.format import XML

spark_version = get_spark_version(spark)
if spark_version.major < 3:
pytest.skip("XML files are supported on Spark 3.x only")

file_df_connection, source_path, _ = local_fs_file_df_connection_with_path_and_files
xml_file_path = source_path / "xml" / "without_compression" / "file.xml"

with open(xml_file_path) as file:
xml_data = file.read()

df = spark.createDataFrame([(xml_data,)], ["xml_string"])
# remove the <root> tag from the XML string
df = df.withColumn("xml_string", expr("regexp_replace(xml_string, '^<root>|</root>$', '')"))

xml = XML(row_tag="item")
parsed_df = df.select(xml.parse_column(column_type("xml_string"), schema=file_df_schema))
transformed_df = parsed_df.select(
"xml_string.id",
"xml_string.str_value",
"xml_string.int_value",
"xml_string.date_value",
"xml_string.datetime_value",
"xml_string.float_value",
)
expected_df_selected = expected_xml_attributes_df.select(
"id",
"str_value",
"int_value",
"date_value",
"datetime_value",
"float_value",
)
assert_subset_df(transformed_df, expected_df_selected)
dolfinus marked this conversation as resolved.
Show resolved Hide resolved