Skip to content

Commit

Permalink
[DOP-13840] - add JSON.parse_column and JSON.serialize_column methods (
Browse files Browse the repository at this point in the history
…#257)

* [DOP-13840] - add JSON.parse_column, JSON.serialize_column methods

* [DOP-13840] - add tests

* [DOP-13840] - update documentation
  • Loading branch information
maxim-lixakov committed Apr 23, 2024
1 parent 511a330 commit b296810
Show file tree
Hide file tree
Showing 9 changed files with 242 additions and 88 deletions.
1 change: 1 addition & 0 deletions docs/changelog/next_release/257.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add ``JSON.parse_column`` and ``JSON.serialize_column`` methods to facilitate direct parsing of JSON strings into Spark DataFrame columns and serialization of structured DataFrame columns back into JSON strings.
28 changes: 18 additions & 10 deletions docs/connection/db_connection/clickhouse/types.rst
Original file line number Diff line number Diff line change
Expand Up @@ -336,13 +336,17 @@ Explicit type cast
~~~~~~~~~~~~

Use ``CAST`` or ``toJSONString`` to get column data as string in JSON format,
and then cast string column in resulting dataframe to proper type using `from_json <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.from_json.html>`_:

.. code:: python
For parsing JSON columns in ClickHouse, :obj:`JSON.parse_column <onetl.file.format.json.JSON.parse_column>` method.

.. code-block:: python
from pyspark.sql.functions import from_json
from pyspark.sql.types import ArrayType, IntegerType
from onetl.file.format import JSON
from onetl.connection import ClickHouse
from onetl.db import DBReader
reader = DBReader(
connection=clickhouse,
columns=[
Expand All @@ -357,16 +361,22 @@ and then cast string column in resulting dataframe to proper type using `from_js
df = df.select(
df.id,
from_json(df.array_column, column_type).alias("array_column"),
JSON().parse_column("array_column", column_type),
)
``DBWriter``
~~~~~~~~~~~~

Convert dataframe column to JSON using `to_json <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.to_json.html>`_,
and write it as ``String`` column in Clickhouse:
For writing JSON data to ClickHouse, use the :obj:`JSON.serialize_column <onetl.file.format.json.JSON.serialize_column>` method to convert a DataFrame column to JSON format efficiently and write it as a ``String`` column in Clickhouse.


.. code:: python
.. code-block:: python
from onetl.file.format import JSON
from onetl.connection import ClickHouse
from onetl.db import DBWriter
clickhouse = ClickHouse(...)
clickhouse.execute(
"""
Expand All @@ -379,11 +389,9 @@ and write it as ``String`` column in Clickhouse:
""",
)
from pyspark.sql.functions import to_json
df = df.select(
df.id,
to_json(df.array_column).alias("array_column_json"),
JSON().serialize_column(df.array_column).alias("array_column_json"),
)
writer.run(df)
Expand Down
39 changes: 15 additions & 24 deletions docs/connection/db_connection/greenplum/types.rst
Original file line number Diff line number Diff line change
Expand Up @@ -301,35 +301,29 @@ Explicit type cast
``DBReader``
~~~~~~~~~~~~

Unfortunately, it is not possible to cast unsupported column to some supported type on ``DBReader`` side:
Direct casting of Greenplum types is not supported by DBReader due to the connector’s implementation specifics.

.. code-block:: python
DBReader(
reader = DBReader(
connection=greenplum,
# will fail
columns=["CAST(column AS text)"],
columns=["CAST(unsupported_column AS text)"],
)
This is related to Greenplum connector implementation. Instead of passing this ``CAST`` expression to ``SELECT`` query
as is, it performs type cast on Spark side, so this syntax is not supported.

But there is a workaround - create a view with casting unsupported column to ``text`` (or any other supported type).
But there is a workaround - create a view with casting unsupported column to text (or any other supported type).
For example, you can use `to_json <https://www.postgresql.org/docs/current/functions-json.html>`_ Postgres function to convert column of any type to string representation and then parse this column on Spark side using :obj:`JSON.parse_column <onetl.file.format.json.JSON.parse_column>` method.

For example, you can use ``to_json`` Postgres function for convert column of any type to string representation.
You can then parse this column on Spark side using `from_json <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.from_json.html>`_:

.. code:: python
.. code-block:: python
from pyspark.sql.functions import from_json
from pyspark.sql.types import ArrayType, IntegerType
from onetl.connection import Greenplum
from onetl.db import DBReader
from onetl.file.format import JSON
greenplum = Greenplum(...)
# create view with proper type cast
greenplum.execute(
"""
CREATE VIEW schema.view_with_json_column AS
Expand All @@ -350,29 +344,26 @@ You can then parse this column on Spark side using `from_json <https://spark.apa
)
df = reader.run()
# Spark requires all columns to have some type, describe it
column_type = ArrayType(IntegerType())
# Define the schema for the JSON data
json_scheme = ArrayType(IntegerType())
# cast column content to proper Spark type
df = df.select(
df.id,
df.supported_column,
from_json(df.array_column_as_json, schema).alias("array_column"),
JSON().parse_column(df.array_column_as_json, json_scheme).alias("array_column"),
)
``DBWriter``
~~~~~~~~~~~~

It is always possible to convert data on Spark side to string, and then write it to ``text`` column in Greenplum table.
To write data to a ``text`` or ``json`` column in a Greenplum table, use :obj:`JSON.serialize_column <onetl.file.format.json.JSON.serialize_column>` method.

For example, you can convert data using `to_json <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.to_json.html>`_ function.

.. code:: python
.. code-block:: python
from pyspark.sql.functions import to_json
from onetl.connection import Greenplum
from onetl.db import DBReader
from onetl.db import DBWriter
from onetl.file.format import JSON
greenplum = Greenplum(...)
Expand All @@ -390,7 +381,7 @@ For example, you can convert data using `to_json <https://spark.apache.org/docs/
write_df = df.select(
df.id,
df.supported_column,
to_json(df.array_column).alias("array_column_json"),
JSON().serialize_column(df.array_column).alias("array_column_json"),
)
writer = DBWriter(
Expand Down
29 changes: 13 additions & 16 deletions docs/connection/db_connection/mysql/types.rst
Original file line number Diff line number Diff line change
Expand Up @@ -289,16 +289,15 @@ It is possible to explicitly cast column type using ``DBReader(columns=...)`` sy

For example, you can use ``CAST(column AS text)`` to convert data to string representation on MySQL side, and so it will be read as Spark's ``StringType()``.

It is also possible to use `JSON_OBJECT <https://dev.mysql.com/doc/refman/en/json.html>`_ MySQL function
to convert column of any type to string representation, and then parse this column on Spark side using
`from_json <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.from_json.html>`_:
It is also possible to use `JSON_OBJECT <https://dev.mysql.com/doc/refman/en/json.html>`_ MySQL function and parse JSON columns in MySQL with the :obj:`JSON.parse_column <onetl.file.format.json.JSON.parse_column>` method.

.. code-block:: python
from pyspark.sql.types import IntegerType, StructField, StructType
from pyspark.sql.types import IntegerType, StructType, StructField
from onetl.connection import MySQL
from onetl.db import DBReader
from onetl.file.format import JSON
mysql = MySQL(...)
Expand All @@ -314,42 +313,40 @@ to convert column of any type to string representation, and then parse this colu
)
df = reader.run()
# Spark requires all columns to have some type, describe it
column_type = StructType([StructField("key", IntegerType())])
json_scheme = StructType([StructField("key", IntegerType())])
# cast column content to proper Spark type
df = df.select(
df.id,
df.supported_column,
# explicit cast
df.unsupported_column_str.cast("integer").alias("parsed_integer"),
# or explicit json parsing
from_json(df.json_column, schema).alias("struct_column"),
JSON().parse_column("json_column", json_scheme).alias("struct_column"),
)
``DBWriter``
~~~~~~~~~~~~

Convert dataframe column to JSON using `to_json <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.to_json.html>`_,
and write it as ``text`` column in MySQL:
To write JSON data to a ``json`` or ``text`` column in a MySQL table, use the :obj:`JSON.serialize_column <onetl.file.format.json.JSON.serialize_column>` method.

.. code:: python
.. code-block:: python
from onetl.connection import MySQL
from onetl.db import DBWriter
from onetl.file.format import JSON
mysql.execute(
"""
CREATE TABLE schema.target_tbl AS (
CREATE TABLE schema.target_tbl (
id bigint,
array_column_json json -- any string type, actually
)
ENGINE = InnoDB
""",
)
from pyspark.sql.functions import to_json
df = df.select(
df.id,
to_json(df.array_column).alias("array_column_json"),
JSON().serialize_column(df.array_column).alias("array_column_json"),
)
writer.run(df)
Expand Down
27 changes: 11 additions & 16 deletions docs/connection/db_connection/oracle/types.rst
Original file line number Diff line number Diff line change
Expand Up @@ -301,12 +301,12 @@ For example, you can use ``CAST(column AS CLOB)`` to convert data to string repr

It is also possible to use `JSON_ARRAY <https://docs.oracle.com/en/database/oracle/oracle-database/23/sqlrf/JSON_ARRAY.html>`_
or `JSON_OBJECT <https://docs.oracle.com/en/database/oracle/oracle-database/23/sqlrf/JSON_OBJECT.html>`_ Oracle functions
to convert column of any type to string representation, and then parse this column on Spark side using
`from_json <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.from_json.html>`_:
to convert column of any type to string representation. Then this JSON string can then be effectively parsed using the :obj:`JSON.parse_column <onetl.file.format.json.JSON.parse_column>` method.

.. code-block:: python
from pyspark.sql.types import IntegerType
from onetl.file.format import JSON
from pyspark.sql.types import IntegerType, StructType, StructField
from onetl.connection import Oracle
from onetl.db import DBReader
Expand All @@ -325,32 +325,27 @@ to convert column of any type to string representation, and then parse this colu
)
df = reader.run()
# Spark requires all columns to have some type, describe it
column_type = IntegerType()
json_scheme = StructType([StructField("key", IntegerType())])
# cast column content to proper Spark type
df = df.select(
df.id,
df.supported_column,
# explicit cast
df.unsupported_column_str.cast("integer").alias("parsed_integer"),
# or explicit json parsing
from_json(df.array_column_json, schema).alias("array_column"),
JSON().parse_column("array_column_json", json_scheme).alias("array_column"),
)
``DBWriter``
~~~~~~~~~~~~

It is always possible to convert data on Spark side to string, and then write it to ``text`` column in Oracle table.
It is always possible to convert data on Spark side to string, and then write it to text column in Oracle table.

For example, you can convert data using `to_json <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.to_json.html>`_ function.
To serialize and write JSON data to a ``text`` or ``json`` column in an Oracle table use the :obj:`JSON.serialize_column <onetl.file.format.json.JSON.serialize_column>` method.

.. code:: python
from pyspark.sql.functions import to_json
.. code-block:: python
from onetl.connection import Oracle
from onetl.db import DBReader
from onetl.db import DBWriter
from onetl.file.format import JSON
oracle = Oracle(...)
Expand All @@ -367,7 +362,7 @@ For example, you can convert data using `to_json <https://spark.apache.org/docs/
write_df = df.select(
df.id,
df.supported_column,
to_json(df.unsupported_column).alias("array_column_json"),
JSON().serialize_column(df.unsupported_column).alias("array_column_json"),
)
writer = DBWriter(
Expand Down
40 changes: 20 additions & 20 deletions docs/connection/db_connection/postgres/types.rst
Original file line number Diff line number Diff line change
Expand Up @@ -348,17 +348,15 @@ It is possible to explicitly cast column of unsupported type using ``DBReader(co

For example, you can use ``CAST(column AS text)`` to convert data to string representation on Postgres side, and so it will be read as Spark's ``StringType()``.

It is also possible to use `to_json <https://www.postgresql.org/docs/current/functions-json.html>`_ Postgres function
to convert column of any type to string representation, and then parse this column on Spark side using
`from_json <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.from_json.html>`_:
It is also possible to use `to_json <https://www.postgresql.org/docs/current/functions-json.html>`_ Postgres function to convert column of any type to string representation, and then parse this column on Spark side you can use the :obj:`JSON.parse_column <onetl.file.format.json.JSON.parse_column>` method:

.. code-block:: python
from pyspark.sql.functions import from_json
from pyspark.sql.types import IntegerType
from onetl.connection import Postgres
from onetl.db import DBReader
from onetl.file.format import JSON
postgres = Postgres(...)
Expand All @@ -374,35 +372,37 @@ to convert column of any type to string representation, and then parse this colu
)
df = reader.run()
# Spark requires all columns to have some type, describe it
column_type = IntegerType()
# cast column content to proper Spark type
json_schema = StructType(
[
StructField("id", IntegerType(), nullable=True),
StructField("name", StringType(), nullable=True),
...,
]
)
df = df.select(
df.id,
df.supported_column,
# explicit cast
df.unsupported_column_str.cast("integer").alias("parsed_integer"),
# or explicit json parsing
from_json(df.array_column_json, schema).alias("array_column"),
JSON().parse_column("array_column_json", json_schema).alias("json_string"),
)
``DBWriter``
~~~~~~~~~~~~

It is always possible to convert data on Spark side to string, and then write it to ``text`` column in Postgres table.

Using ``to_json``
^^^^^^^^^^^^^^^^^
It is always possible to convert data on the Spark side to a string, and then write it to a text column in a Postgres table.

For example, you can convert data using `to_json <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.to_json.html>`_ function.
Using JSON.serialize_column
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
You can use the :obj:`JSON.serialize_column <onetl.file.format.json.JSON.serialize_column>` method for data serialization:

.. code:: python
.. code-block:: python
from pyspark.sql.functions import to_json
from onetl.file.format import JSON
from pyspark.sql.functions import col
from onetl.connection import Postgres
from onetl.db import DBReader
from onetl.db import DBWriter
postgres = Postgres(...)
Expand All @@ -419,7 +419,7 @@ For example, you can convert data using `to_json <https://spark.apache.org/docs/
write_df = df.select(
df.id,
df.supported_column,
to_json(df.unsupported_column).alias("array_column_json"),
JSON().serialize_column(df.unsupported_column).alias("array_column_json"),
)
writer = DBWriter(
Expand All @@ -428,7 +428,7 @@ For example, you can convert data using `to_json <https://spark.apache.org/docs/
)
writer.run(write_df)
Then you can parse this column on Postgres side (for example, by creating a view):
Then you can parse this column on the Postgres side (for example, by creating a view):

.. code-block:: sql
Expand Down
2 changes: 1 addition & 1 deletion docs/file_df/file_formats/json.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ JSON
.. currentmodule:: onetl.file.format.json

.. autoclass:: JSON
:members: __init__
:members: __init__, parse_column, serialize_column
Loading

0 comments on commit b296810

Please sign in to comment.