Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DOP-13840] - add JSON.parse_column and JSON.serialize_column methods #257

Merged
merged 3 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/changelog/next_release/257.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add ``JSON.parse_column`` and ``JSON.serialize_column`` methods to facilitate direct parsing of JSON strings into Spark DataFrame columns and serialization of structured DataFrame columns back into JSON strings.
28 changes: 18 additions & 10 deletions docs/connection/db_connection/clickhouse/types.rst
Original file line number Diff line number Diff line change
Expand Up @@ -336,13 +336,17 @@ Explicit type cast
~~~~~~~~~~~~

Use ``CAST`` or ``toJSONString`` to get column data as string in JSON format,
maxim-lixakov marked this conversation as resolved.
Show resolved Hide resolved
and then cast string column in resulting dataframe to proper type using `from_json <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.from_json.html>`_:

.. code:: python
For parsing JSON columns in ClickHouse, :obj:`JSON.parse_column <onetl.file.format.json.JSON.parse_column>` method.

.. code-block:: python

from pyspark.sql.functions import from_json
from pyspark.sql.types import ArrayType, IntegerType

from onetl.file.format import JSON
from onetl.connection import ClickHouse
from onetl.db import DBReader

reader = DBReader(
connection=clickhouse,
columns=[
Expand All @@ -357,16 +361,22 @@ and then cast string column in resulting dataframe to proper type using `from_js

df = df.select(
df.id,
from_json(df.array_column, column_type).alias("array_column"),
JSON().parse_column("array_column", column_type),
)

``DBWriter``
~~~~~~~~~~~~

Convert dataframe column to JSON using `to_json <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.to_json.html>`_,
and write it as ``String`` column in Clickhouse:
For writing JSON data to ClickHouse, use the :obj:`JSON.serialize_column <onetl.file.format.json.JSON.serialize_column>` method to convert a DataFrame column to JSON format efficiently and write it as a ``String`` column in Clickhouse.


.. code:: python
.. code-block:: python

from onetl.file.format import JSON
from onetl.connection import ClickHouse
from onetl.db import DBWriter

clickhouse = ClickHouse(...)

clickhouse.execute(
"""
Expand All @@ -379,11 +389,9 @@ and write it as ``String`` column in Clickhouse:
""",
)

from pyspark.sql.functions import to_json

df = df.select(
df.id,
to_json(df.array_column).alias("array_column_json"),
JSON().serialize_column(df.array_column).alias("array_column_json"),
)

writer.run(df)
Expand Down
39 changes: 15 additions & 24 deletions docs/connection/db_connection/greenplum/types.rst
Original file line number Diff line number Diff line change
Expand Up @@ -301,35 +301,29 @@ Explicit type cast
``DBReader``
~~~~~~~~~~~~

Unfortunately, it is not possible to cast unsupported column to some supported type on ``DBReader`` side:
maxim-lixakov marked this conversation as resolved.
Show resolved Hide resolved
Direct casting of Greenplum types is not supported by DBReader due to the connector’s implementation specifics.

.. code-block:: python

DBReader(
reader = DBReader(
connection=greenplum,
# will fail
columns=["CAST(column AS text)"],
columns=["CAST(unsupported_column AS text)"],
)

This is related to Greenplum connector implementation. Instead of passing this ``CAST`` expression to ``SELECT`` query
as is, it performs type cast on Spark side, so this syntax is not supported.

But there is a workaround - create a view with casting unsupported column to ``text`` (or any other supported type).
But there is a workaround - create a view with casting unsupported column to text (or any other supported type).
For example, you can use `to_json <https://www.postgresql.org/docs/current/functions-json.html>`_ Postgres function to convert column of any type to string representation and then parse this column on Spark side using :obj:`JSON.parse_column <onetl.file.format.json.JSON.parse_column>` method.

For example, you can use ``to_json`` Postgres function for convert column of any type to string representation.
maxim-lixakov marked this conversation as resolved.
Show resolved Hide resolved
You can then parse this column on Spark side using `from_json <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.from_json.html>`_:

.. code:: python
.. code-block:: python

from pyspark.sql.functions import from_json
from pyspark.sql.types import ArrayType, IntegerType

from onetl.connection import Greenplum
from onetl.db import DBReader
from onetl.file.format import JSON

greenplum = Greenplum(...)

# create view with proper type cast
greenplum.execute(
"""
CREATE VIEW schema.view_with_json_column AS
Expand All @@ -350,29 +344,26 @@ You can then parse this column on Spark side using `from_json <https://spark.apa
)
df = reader.run()

# Spark requires all columns to have some type, describe it
column_type = ArrayType(IntegerType())
# Define the schema for the JSON data
json_scheme = ArrayType(IntegerType())

# cast column content to proper Spark type
df = df.select(
df.id,
df.supported_column,
from_json(df.array_column_as_json, schema).alias("array_column"),
JSON().parse_column(df.array_column_as_json, json_scheme).alias("array_column"),
)

``DBWriter``
~~~~~~~~~~~~

It is always possible to convert data on Spark side to string, and then write it to ``text`` column in Greenplum table.
To write data to a ``text`` or ``json`` column in a Greenplum table, use :obj:`JSON.serialize_column <onetl.file.format.json.JSON.serialize_column>` method.

For example, you can convert data using `to_json <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.to_json.html>`_ function.

.. code:: python
.. code-block:: python

from pyspark.sql.functions import to_json

from onetl.connection import Greenplum
from onetl.db import DBReader
from onetl.db import DBWriter
from onetl.file.format import JSON

greenplum = Greenplum(...)

Expand All @@ -390,7 +381,7 @@ For example, you can convert data using `to_json <https://spark.apache.org/docs/
write_df = df.select(
df.id,
df.supported_column,
to_json(df.array_column).alias("array_column_json"),
JSON().serialize_column(df.array_column).alias("array_column_json"),
)

writer = DBWriter(
Expand Down
29 changes: 13 additions & 16 deletions docs/connection/db_connection/mysql/types.rst
Original file line number Diff line number Diff line change
Expand Up @@ -289,16 +289,15 @@ It is possible to explicitly cast column type using ``DBReader(columns=...)`` sy

For example, you can use ``CAST(column AS text)`` to convert data to string representation on MySQL side, and so it will be read as Spark's ``StringType()``.

It is also possible to use `JSON_OBJECT <https://dev.mysql.com/doc/refman/en/json.html>`_ MySQL function
maxim-lixakov marked this conversation as resolved.
Show resolved Hide resolved
to convert column of any type to string representation, and then parse this column on Spark side using
`from_json <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.from_json.html>`_:
It is also possible to use `JSON_OBJECT <https://dev.mysql.com/doc/refman/en/json.html>`_ MySQL function and parse JSON columns in MySQL with the :obj:`JSON.parse_column <onetl.file.format.json.JSON.parse_column>` method.

.. code-block:: python

from pyspark.sql.types import IntegerType, StructField, StructType
from pyspark.sql.types import IntegerType, StructType, StructField

from onetl.connection import MySQL
from onetl.db import DBReader
from onetl.file.format import JSON

mysql = MySQL(...)

Expand All @@ -314,42 +313,40 @@ to convert column of any type to string representation, and then parse this colu
)
df = reader.run()

# Spark requires all columns to have some type, describe it
column_type = StructType([StructField("key", IntegerType())])
json_scheme = StructType([StructField("key", IntegerType())])

# cast column content to proper Spark type
df = df.select(
df.id,
df.supported_column,
# explicit cast
df.unsupported_column_str.cast("integer").alias("parsed_integer"),
# or explicit json parsing
from_json(df.json_column, schema).alias("struct_column"),
JSON().parse_column("json_column", json_scheme).alias("struct_column"),
)

``DBWriter``
~~~~~~~~~~~~

Convert dataframe column to JSON using `to_json <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.to_json.html>`_,
and write it as ``text`` column in MySQL:
To write JSON data to a ``json`` or ``text`` column in a MySQL table, use the :obj:`JSON.serialize_column <onetl.file.format.json.JSON.serialize_column>` method.

.. code:: python
.. code-block:: python

from onetl.connection import MySQL
from onetl.db import DBWriter
from onetl.file.format import JSON

mysql.execute(
"""
CREATE TABLE schema.target_tbl AS (
CREATE TABLE schema.target_tbl (
id bigint,
array_column_json json -- any string type, actually
)
ENGINE = InnoDB
""",
)

from pyspark.sql.functions import to_json

df = df.select(
df.id,
to_json(df.array_column).alias("array_column_json"),
JSON().serialize_column(df.array_column).alias("array_column_json"),
)

writer.run(df)
Expand Down
27 changes: 11 additions & 16 deletions docs/connection/db_connection/oracle/types.rst
Original file line number Diff line number Diff line change
Expand Up @@ -301,12 +301,12 @@ For example, you can use ``CAST(column AS CLOB)`` to convert data to string repr

It is also possible to use `JSON_ARRAY <https://docs.oracle.com/en/database/oracle/oracle-database/23/sqlrf/JSON_ARRAY.html>`_
maxim-lixakov marked this conversation as resolved.
Show resolved Hide resolved
or `JSON_OBJECT <https://docs.oracle.com/en/database/oracle/oracle-database/23/sqlrf/JSON_OBJECT.html>`_ Oracle functions
to convert column of any type to string representation, and then parse this column on Spark side using
`from_json <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.from_json.html>`_:
to convert column of any type to string representation. Then this JSON string can then be effectively parsed using the :obj:`JSON.parse_column <onetl.file.format.json.JSON.parse_column>` method.

.. code-block:: python

from pyspark.sql.types import IntegerType
from onetl.file.format import JSON
from pyspark.sql.types import IntegerType, StructType, StructField

from onetl.connection import Oracle
from onetl.db import DBReader
Expand All @@ -325,32 +325,27 @@ to convert column of any type to string representation, and then parse this colu
)
df = reader.run()

# Spark requires all columns to have some type, describe it
column_type = IntegerType()
json_scheme = StructType([StructField("key", IntegerType())])

# cast column content to proper Spark type
df = df.select(
df.id,
df.supported_column,
# explicit cast
df.unsupported_column_str.cast("integer").alias("parsed_integer"),
# or explicit json parsing
from_json(df.array_column_json, schema).alias("array_column"),
JSON().parse_column("array_column_json", json_scheme).alias("array_column"),
)

``DBWriter``
~~~~~~~~~~~~

It is always possible to convert data on Spark side to string, and then write it to ``text`` column in Oracle table.
It is always possible to convert data on Spark side to string, and then write it to text column in Oracle table.

For example, you can convert data using `to_json <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.to_json.html>`_ function.
To serialize and write JSON data to a ``text`` or ``json`` column in an Oracle table use the :obj:`JSON.serialize_column <onetl.file.format.json.JSON.serialize_column>` method.

.. code:: python

from pyspark.sql.functions import to_json
.. code-block:: python

from onetl.connection import Oracle
from onetl.db import DBReader
from onetl.db import DBWriter
from onetl.file.format import JSON

oracle = Oracle(...)

Expand All @@ -367,7 +362,7 @@ For example, you can convert data using `to_json <https://spark.apache.org/docs/
write_df = df.select(
df.id,
df.supported_column,
to_json(df.unsupported_column).alias("array_column_json"),
JSON().serialize_column(df.unsupported_column).alias("array_column_json"),
)

writer = DBWriter(
Expand Down
40 changes: 20 additions & 20 deletions docs/connection/db_connection/postgres/types.rst
Original file line number Diff line number Diff line change
Expand Up @@ -348,17 +348,15 @@ It is possible to explicitly cast column of unsupported type using ``DBReader(co

For example, you can use ``CAST(column AS text)`` to convert data to string representation on Postgres side, and so it will be read as Spark's ``StringType()``.

It is also possible to use `to_json <https://www.postgresql.org/docs/current/functions-json.html>`_ Postgres function
maxim-lixakov marked this conversation as resolved.
Show resolved Hide resolved
to convert column of any type to string representation, and then parse this column on Spark side using
`from_json <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.from_json.html>`_:
It is also possible to use `to_json <https://www.postgresql.org/docs/current/functions-json.html>`_ Postgres function to convert column of any type to string representation, and then parse this column on Spark side you can use the :obj:`JSON.parse_column <onetl.file.format.json.JSON.parse_column>` method:

.. code-block:: python

from pyspark.sql.functions import from_json
from pyspark.sql.types import IntegerType

from onetl.connection import Postgres
from onetl.db import DBReader
from onetl.file.format import JSON

postgres = Postgres(...)

Expand All @@ -374,35 +372,37 @@ to convert column of any type to string representation, and then parse this colu
)
df = reader.run()

# Spark requires all columns to have some type, describe it
column_type = IntegerType()

# cast column content to proper Spark type
json_schema = StructType(
[
StructField("id", IntegerType(), nullable=True),
StructField("name", StringType(), nullable=True),
...,
]
)
df = df.select(
df.id,
df.supported_column,
# explicit cast
df.unsupported_column_str.cast("integer").alias("parsed_integer"),
maxim-lixakov marked this conversation as resolved.
Show resolved Hide resolved
# or explicit json parsing
from_json(df.array_column_json, schema).alias("array_column"),
JSON().parse_column("array_column_json", json_schema).alias("json_string"),
)

``DBWriter``
~~~~~~~~~~~~

It is always possible to convert data on Spark side to string, and then write it to ``text`` column in Postgres table.

Using ``to_json``
^^^^^^^^^^^^^^^^^
It is always possible to convert data on the Spark side to a string, and then write it to a text column in a Postgres table.

For example, you can convert data using `to_json <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.to_json.html>`_ function.
Using JSON.serialize_column
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
You can use the :obj:`JSON.serialize_column <onetl.file.format.json.JSON.serialize_column>` method for data serialization:

.. code:: python
.. code-block:: python

from pyspark.sql.functions import to_json
from onetl.file.format import JSON
from pyspark.sql.functions import col

from onetl.connection import Postgres
from onetl.db import DBReader
from onetl.db import DBWriter

postgres = Postgres(...)

Expand All @@ -419,7 +419,7 @@ For example, you can convert data using `to_json <https://spark.apache.org/docs/
write_df = df.select(
df.id,
df.supported_column,
to_json(df.unsupported_column).alias("array_column_json"),
JSON().serialize_column(df.unsupported_column).alias("array_column_json"),
)

writer = DBWriter(
Expand All @@ -428,7 +428,7 @@ For example, you can convert data using `to_json <https://spark.apache.org/docs/
)
writer.run(write_df)

Then you can parse this column on Postgres side (for example, by creating a view):
Then you can parse this column on the Postgres side (for example, by creating a view):

.. code-block:: sql

Expand Down
2 changes: 1 addition & 1 deletion docs/file_df/file_formats/json.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ JSON
.. currentmodule:: onetl.file.format.json

.. autoclass:: JSON
:members: __init__
:members: __init__, parse_column, serialize_column
Loading
Loading