Skip to content

Commit

Permalink
[DOP-13840] - update documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
maxim-lixakov committed Apr 19, 2024
1 parent 501a23c commit d6826f0
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 106 deletions.
31 changes: 19 additions & 12 deletions docs/connection/db_connection/clickhouse/types.rst
Original file line number Diff line number Diff line change
Expand Up @@ -335,19 +335,21 @@ Explicit type cast
``DBReader``
~~~~~~~~~~~~

Use ``CAST`` or ``toJSONString`` to get column data as string in JSON format,
and then cast string column in resulting dataframe to proper type using `from_json <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.from_json.html>`_:
Instead of using ``CAST`` or ``toJSONString`` to format column data as a JSON string, and then casting it back to the proper type using `from_json <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.from_json.html>`_, you can utilize the ``parse_column`` method from the :ref:`JSON <json-file-format>` class to handle data parsing directly:

.. code:: python
.. code-block:: python
from pyspark.sql.functions import from_json
from pyspark.sql.types import ArrayType, IntegerType
from onetl.file.format import JSON
from onetl.connection import ClickHouse
from onetl.db import DBReader
reader = DBReader(
connection=clickhouse,
columns=[
"id",
"toJSONString(array_column) array_column",
"array_column", # Fetch the raw data directly
],
)
df = reader.run()
Expand All @@ -357,16 +359,23 @@ and then cast string column in resulting dataframe to proper type using `from_js
df = df.select(
df.id,
from_json(df.array_column, column_type).alias("array_column"),
JSON().parse_column("array_column", column_type).alias("array_column"),
)
``DBWriter``
~~~~~~~~~~~~
Instead of using `to_json <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.to_json.html>`_ to convert a DataFrame column to JSON and write it as a ``String`` column in Clickhouse,
utilize the ``serialize_column`` method from the :ref:`JSON <json-file-format>` class:

.. code-block:: python
Convert dataframe column to JSON using `to_json <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.to_json.html>`_,
and write it as ``String`` column in Clickhouse:
from pyspark.sql.functions import col
.. code:: python
from onetl.file.format import JSON
from onetl.connection import ClickHouse
from onetl.db import DBWriter
clickhouse = ClickHouse(...)
clickhouse.execute(
"""
Expand All @@ -379,11 +388,9 @@ and write it as ``String`` column in Clickhouse:
""",
)
from pyspark.sql.functions import to_json
df = df.select(
df.id,
to_json(df.array_column).alias("array_column_json"),
JSON().serialize_column(df.array_column).alias("array_column_json"),
)
writer.run(df)
Expand Down
42 changes: 12 additions & 30 deletions docs/connection/db_connection/greenplum/types.rst
Original file line number Diff line number Diff line change
Expand Up @@ -301,35 +301,20 @@ Explicit type cast
``DBReader``
~~~~~~~~~~~~

Unfortunately, it is not possible to cast unsupported column to some supported type on ``DBReader`` side:
Direct casting of unsupported column types in ``DBReader`` queries for Greenplum is not supported due to the connector's implementation, which processes type casts on the Spark side rather than in SQL.

.. code-block:: python
DBReader(
connection=greenplum,
# will fail
columns=["CAST(column AS text)"],
)
This is related to Greenplum connector implementation. Instead of passing this ``CAST`` expression to ``SELECT`` query
as is, it performs type cast on Spark side, so this syntax is not supported.

But there is a workaround - create a view with casting unsupported column to ``text`` (or any other supported type).
A recommended workaround is to create a view within Greenplum that prepares data as needed. Instead of using SQL functions like ``to_json`` to convert columns to JSON string format for parsing in Spark, it is advised to handle data directly using the :ref:`JSON <json-file-format>` class.

For example, you can use ``to_json`` Postgres function for convert column of any type to string representation.
You can then parse this column on Spark side using `from_json <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.from_json.html>`_:

.. code:: python
.. code-block:: python
from pyspark.sql.functions import from_json
from pyspark.sql.types import ArrayType, IntegerType
from onetl.connection import Greenplum
from onetl.db import DBReader
from onetl.file.format import JSON
greenplum = Greenplum(...)
# create view with proper type cast
greenplum.execute(
"""
CREATE VIEW schema.view_with_json_column AS
Expand All @@ -350,29 +335,26 @@ You can then parse this column on Spark side using `from_json <https://spark.apa
)
df = reader.run()
# Spark requires all columns to have some type, describe it
column_type = ArrayType(IntegerType())
# Define the schema for the JSON data
json_scheme = ArrayType(IntegerType())
# cast column content to proper Spark type
df = df.select(
df.id,
df.supported_column,
from_json(df.array_column_as_json, schema).alias("array_column"),
JSON().parse_column(df.array_column_as_json, json_scheme).alias("array_column"),
)
``DBWriter``
~~~~~~~~~~~~

It is always possible to convert data on Spark side to string, and then write it to ``text`` column in Greenplum table.

For example, you can convert data using `to_json <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.to_json.html>`_ function.
To write data to a ``text`` or ``json`` column in a Greenplum table, use :ref:`JSON <json-file-format>` class, instead of the standard `to_json <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.to_json.html>`_ function.

.. code:: python
.. code-block:: python
from pyspark.sql.functions import to_json
from onetl.connection import Greenplum
from onetl.db import DBReader
from onetl.db import DBWriter
from onetl.file.format import JSON
greenplum = Greenplum(...)
Expand All @@ -390,7 +372,7 @@ For example, you can convert data using `to_json <https://spark.apache.org/docs/
write_df = df.select(
df.id,
df.supported_column,
to_json(df.array_column).alias("array_column_json"),
JSON().serialize_column(df.array_column).alias("array_column_json"),
)
writer = DBWriter(
Expand Down
29 changes: 13 additions & 16 deletions docs/connection/db_connection/mysql/types.rst
Original file line number Diff line number Diff line change
Expand Up @@ -289,16 +289,15 @@ It is possible to explicitly cast column type using ``DBReader(columns=...)`` sy

For example, you can use ``CAST(column AS text)`` to convert data to string representation on MySQL side, and so it will be read as Spark's ``StringType()``.

It is also possible to use `JSON_OBJECT <https://dev.mysql.com/doc/refman/en/json.html>`_ MySQL function
to convert column of any type to string representation, and then parse this column on Spark side using
`from_json <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.from_json.html>`_:
Instead of parsing this column on the Spark side using `from_json <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.from_json.html>`_, you can directly use the :ref:`JSON <json-file-format>` class to handle the parsing:

.. code-block:: python
from pyspark.sql.types import IntegerType, StructField, StructType
from pyspark.sql.types import IntegerType, StructType, StructField
from onetl.connection import MySQL
from onetl.db import DBReader
from onetl.file.format import JSON
mysql = MySQL(...)
Expand All @@ -314,42 +313,40 @@ to convert column of any type to string representation, and then parse this colu
)
df = reader.run()
# Spark requires all columns to have some type, describe it
column_type = StructType([StructField("key", IntegerType())])
json_scheme = StructType([StructField("key", IntegerType())])
# cast column content to proper Spark type
df = df.select(
df.id,
df.supported_column,
# explicit cast
df.unsupported_column_str.cast("integer").alias("parsed_integer"),
# or explicit json parsing
from_json(df.json_column, schema).alias("struct_column"),
JSON().parse_column("json_column", json_scheme).alias("struct_column"),
)
``DBWriter``
~~~~~~~~~~~~

Convert dataframe column to JSON using `to_json <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.to_json.html>`_,
and write it as ``text`` column in MySQL:
Convert a DataFrame column to JSON and write it to a ``json`` or ``text`` column in MySQL using the custom serialization method provided by :ref:`JSON <json-file-format>` class, instead of the standard `to_json <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.to_json.html>`_ function.

.. code:: python
.. code-block:: python
from onetl.connection import MySQL
from onetl.db import DBWriter
from onetl.file.format import JSON
mysql.execute(
"""
CREATE TABLE schema.target_tbl AS (
CREATE TABLE schema.target_tbl (
id bigint,
array_column_json json -- any string type, actually
)
ENGINE = InnoDB
""",
)
from pyspark.sql.functions import to_json
df = df.select(
df.id,
to_json(df.array_column).alias("array_column_json"),
JSON().serialize_column(df.array_column).alias("array_column_json"),
)
writer.run(df)
Expand Down
33 changes: 11 additions & 22 deletions docs/connection/db_connection/oracle/types.rst
Original file line number Diff line number Diff line change
Expand Up @@ -295,18 +295,14 @@ Explicit type cast
``DBReader``
~~~~~~~~~~~~

It is possible to explicitly cast column of unsupported type using ``DBReader(columns=...)`` syntax.
It is possible to explicitly cast the column of an unsupported type using the ``DBReader(columns=...)`` syntax in Oracle queries. For example, you can use ``CAST(column AS VARCHAR2(4000))`` to convert data to string representation on the Oracle side, ensuring it will be read as Spark's ``StringType()``.

For example, you can use ``CAST(column AS CLOB)`` to convert data to string representation on Oracle side, and so it will be read as Spark's ``StringType()``.

It is also possible to use `JSON_ARRAY <https://docs.oracle.com/en/database/oracle/oracle-database/23/sqlrf/JSON_ARRAY.html>`_
or `JSON_OBJECT <https://docs.oracle.com/en/database/oracle/oracle-database/23/sqlrf/JSON_OBJECT.html>`_ Oracle functions
to convert column of any type to string representation, and then parse this column on Spark side using
`from_json <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.from_json.html>`_:
Instead of using `from_json <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.from_json.html>`_, you can directly utilize the :ref:`JSON <json-file-format>` class to handle the parsing:

.. code-block:: python
from pyspark.sql.types import IntegerType
from onetl.file.format import JSON
from pyspark.sql.types import IntegerType, StructType, StructField
from onetl.connection import Oracle
from onetl.db import DBReader
Expand All @@ -325,32 +321,25 @@ to convert column of any type to string representation, and then parse this colu
)
df = reader.run()
# Spark requires all columns to have some type, describe it
column_type = IntegerType()
json_scheme = StructType([StructField("key", IntegerType())])
# cast column content to proper Spark type
df = df.select(
df.id,
df.supported_column,
# explicit cast
df.unsupported_column_str.cast("integer").alias("parsed_integer"),
# or explicit json parsing
from_json(df.array_column_json, schema).alias("array_column"),
JSON().parse_column("array_column_json", json_scheme).alias("array_column"),
)
``DBWriter``
~~~~~~~~~~~~

It is always possible to convert data on Spark side to string, and then write it to ``text`` column in Oracle table.

For example, you can convert data using `to_json <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.to_json.html>`_ function.

.. code:: python
Instead of using the `to_json <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.to_json.html>`_ function to convert DataFrame columns to JSON for writing to a ``text`` column in an Oracle table, utilize the ``serialize_column`` method from our :ref:`JSON <json-file-format>` class.

from pyspark.sql.functions import to_json
.. code-block:: python
from onetl.connection import Oracle
from onetl.db import DBReader
from onetl.db import DBWriter
from onetl.file.format import JSON
oracle = Oracle(...)
Expand All @@ -367,7 +356,7 @@ For example, you can convert data using `to_json <https://spark.apache.org/docs/
write_df = df.select(
df.id,
df.supported_column,
to_json(df.unsupported_column).alias("array_column_json"),
JSON().serialize_column(df.unsupported_column).alias("array_column_json"),
)
writer = DBWriter(
Expand Down
48 changes: 22 additions & 26 deletions docs/connection/db_connection/postgres/types.rst
Original file line number Diff line number Diff line change
Expand Up @@ -348,17 +348,15 @@ It is possible to explicitly cast column of unsupported type using ``DBReader(co

For example, you can use ``CAST(column AS text)`` to convert data to string representation on Postgres side, and so it will be read as Spark's ``StringType()``.

It is also possible to use `to_json <https://www.postgresql.org/docs/current/functions-json.html>`_ Postgres function
to convert column of any type to string representation, and then parse this column on Spark side using
`from_json <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.from_json.html>`_:
To convert column of any type to string representation, and then parse this column on Spark side you can utilize methods from :ref:`File Formats <file-formats>` classes. For example, for json string you might use :ref:`JSON <json-file-format>` class:

.. code-block:: python
from pyspark.sql.functions import from_json
from pyspark.sql.types import IntegerType
from onetl.connection import Postgres
from onetl.db import DBReader
from onetl.file.format import JSON
postgres = Postgres(...)
Expand All @@ -368,41 +366,39 @@ to convert column of any type to string representation, and then parse this colu
"id",
"supported_column",
"CAST(unsupported_column AS text) unsupported_column_str",
# or
"to_json(unsupported_column) array_column_json",
],
)
df = reader.run()
# Spark requires all columns to have some type, describe it
column_type = IntegerType()
# cast column content to proper Spark type
json_schema = StructType(
[
StructField("id", IntegerType(), nullable=True),
StructField("name", StringType(), nullable=True),
...,
]
)
df = df.select(
df.id,
df.supported_column,
# explicit cast
df.unsupported_column_str.cast("integer").alias("parsed_integer"),
# or explicit json parsing
from_json(df.array_column_json, schema).alias("array_column"),
JSON().parse_column("unsupported_column_str", json_schema).alias("json_string"),
)
``DBWriter``
~~~~~~~~~~~~

It is always possible to convert data on Spark side to string, and then write it to ``text`` column in Postgres table.
DBWriter
~~~~~~~~

Using ``to_json``
^^^^^^^^^^^^^^^^^
It is always possible to convert data on the Spark side to a string, and then write it to a text column in a Postgres table.

For example, you can convert data using `to_json <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.to_json.html>`_ function.
Using JSON().serialize_column
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
You can utilize the ``serialize_column`` method from the :ref:`JSON <json-file-format>` class to handle data serialization:

.. code:: python
.. code-block:: python
from pyspark.sql.functions import to_json
from onetl.file.format import JSON
from pyspark.sql.functions import col
from onetl.connection import Postgres
from onetl.db import DBReader
from onetl.db import DBWriter
postgres = Postgres(...)
Expand All @@ -419,7 +415,7 @@ For example, you can convert data using `to_json <https://spark.apache.org/docs/
write_df = df.select(
df.id,
df.supported_column,
to_json(df.unsupported_column).alias("array_column_json"),
JSON().serialize_column(df.unsupported_column).alias("array_column_json"),
)
writer = DBWriter(
Expand All @@ -428,7 +424,7 @@ For example, you can convert data using `to_json <https://spark.apache.org/docs/
)
writer.run(write_df)
Then you can parse this column on Postgres side (for example, by creating a view):
Then you can parse this column on the Postgres side (for example, by creating a view):

.. code-block:: sql
Expand Down

0 comments on commit d6826f0

Please sign in to comment.