Skip to content

Commit

Permalink
[DOP-9085] Improve Greenplum documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
dolfinus committed Sep 26, 2023
1 parent 9557268 commit ae6bde6
Show file tree
Hide file tree
Showing 10 changed files with 469 additions and 29 deletions.
4 changes: 4 additions & 0 deletions docs/changelog/next_release/154.improvement.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Drastically improve ``Greenplum`` documentation:
* Added information about network ports, grants, ``pg_hba.conf`` and so on.
* Added interaction schemas for reading, writing and executing statements in Greenplum.
* Added recommendations about reading data from views and ``JOIN`` results from Greenplum.
1 change: 1 addition & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
"sphinx.ext.autosummary",
"sphinxcontrib.autodoc_pydantic",
"sphinxcontrib.towncrier", # provides `towncrier-draft-entries` directive
"sphinxcontrib.plantuml",
]
numpydoc_show_class_members = True
autodoc_pydantic_model_show_config = False
Expand Down
41 changes: 41 additions & 0 deletions docs/connection/db_connection/greenplum/execute.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,47 @@
Executing statements in Greenplum
==================================

Interaction schema
------------------

Unlike reading & writing, executing statements in Greenplum is done **only** through Greenplum master node,
without any interaction between Greenplum segments and Spark executors. More than that, Spark executors are not used in this case.

The only port used while interacting with Greenplum in this case is ``5432`` (Greenplum master port).

.. dropdown:: Spark <-> Greenplum interaction during Greenplum.execute()/Greenplum.fetch()

.. plantuml::

@startuml
title Greenplum master <-> Spark driver
box "Spark"
participant "Spark driver"
end box

box "Greenplum"
participant "Greenplum master"
end box

== Greenplum.check() ==

activate "Spark driver"
"Spark driver" -> "Greenplum master" ++ : CONNECT

== Greenplum.execute(statement) ==
"Spark driver" --> "Greenplum master" : EXECUTE statement
"Greenplum master" -> "Spark driver" : RETURN result

== Greenplum.close() ==
"Spark driver" --> "Greenplum master" : CLOSE CONNECTION

deactivate "Greenplum master"
deactivate "Spark driver"
@enduml

Options
-------

.. currentmodule:: onetl.connection.db_connection.greenplum.connection

.. automethod:: Greenplum.fetch
Expand Down
176 changes: 176 additions & 0 deletions docs/connection/db_connection/greenplum/prerequisites.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,179 @@ There are several ways to do that. See :ref:`java-packages` for details.

If you're uploading package to private package repo, use ``groupId=io.pivotal`` and ``artifactoryId=greenplum-spark_2.12``
(``2.12`` is Scala version) to give uploaded package a proper name.

Connecting to Greenplum
-----------------------

Interaction schema
~~~~~~~~~~~~~~~~~~

Spark executors open ports to listen incoming requests.
Greenplum segments are initiating connections to Spark executors using `EXTERNAL TABLE <https://docs.vmware.com/en/VMware-Greenplum/7/greenplum-database/ref_guide-sql_commands-CREATE_EXTERNAL_TABLE.html>`_
functionality, and send/read data using `gpfdist <https://docs.vmware.com/en/VMware-Greenplum/index.html>`_ protocol.

Data is **not** send through Greenplum master.
Greenplum master only receives commands to start reading/writing process, and manages all the metadata (external table location, schema and so on).

More details can be found in `official documentation <https://docs.vmware.com/en/VMware-Greenplum-Connector-for-Apache-Spark/2.1/greenplum-connector-spark/overview.html>`_.

Number of parallel connections
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. warning::

This is very important!!!

If you don't limit number of connections, you can exceed the `max_connections <https://docs.vmware.com/en/VMware-Greenplum/6/greenplum-database/admin_guide-client_auth.html#limiting-concurrent-connections>`_
limit set on the Greenplum side. It's usually not so high, e.g. 500-1000 connections max,
depending on your Greenplum instance settings and using connection balancers like ``pgbouncer``.

Consuming all available connections means **nobody** (even admin users) can connect to Greenplum.

Each job on the Spark executor makes its own connection to Greenplum master node,
so you need to limit number of connections to avoid opening too many of them.

* Reading about ``5-10Gb`` of data requires about ``3-5`` parallel connections.
* Reading about ``20-30Gb`` of data requires about ``5-10`` parallel connections.
* Reading about ``50Gb`` of data requires ~ ``10-20`` parallel connections.
* Reading about ``100+Gb`` of data requires ``20-30`` parallel connections.
* Opening more than ``30-50`` connections is not recommended.

Number of connections can be limited by 2 ways:

* By limiting number of Spark executors and number of cores per-executor. Max number of parallel jobs is ``executors * cores``.

.. tabs::

.. code-tab:: py Spark with master=local

(
SparkSession.builder
# Spark will start EXACTLY 10 executors with 1 core each, so max number of parallel jobs is 10
.config("spark.master", "local[10]")
.config("spark.executor.cores", 1)
)

.. code-tab:: py Spark with master=yarn or master=k8s, dynamic allocation

(
SparkSession.builder
.config("spark.master", "yarn")
# Spark will start MAX 10 executors with 1 core each (dynamically), so max number of parallel jobs is 10
.config("spark.dynamicAllocation.maxExecutors", 10)
.config("spark.executor.cores", 1)
)

.. code-tab:: py Spark with master=yarn or master=k8s, static allocation

(
SparkSession.builder
.config("spark.master", "yarn")
# Spark will start EXACTLY 10 executors with 1 core each, so max number of parallel jobs is 10
.config("spark.executor.instances", 10)
.config("spark.executor.cores", 1)
)

* By limiting connection pool size user by Spark (**only** for Spark with ``master=local``):

.. code:: python
spark = SparkSession.builder.config("spark.master", "local[*]").getOrCreate()
# No matter how many executors are started and how many cores they have,
# number of connections cannot exceed pool size:
Greenplum(
...,
extra={
"pool.maxSize": 10,
},
)
See `connection pooling <https://docs.vmware.com/en/VMware-Greenplum-Connector-for-Apache-Spark/2.1/greenplum-connector-spark/using_the_connector.html#jdbcconnpool>`_
documentation.


* By setting :obj:`num_partitions <onetl.connection.db_connection.greenplum.options.GreenplumReadOptions.num_partitions>`
and :obj:`partition_column <onetl.connection.db_connection.greenplum.options.GreenplumReadOptions.partition_column>` (not recommended).

Allowing connection to Greenplum master
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Ask your Greenplum cluster administrator to allow your user to connect to Greenplum master node,
e.g. by updating ``pg_hba.conf`` file.

More details can be found in `official documentation <https://docs.vmware.com/en/VMware-Greenplum/6/greenplum-database/admin_guide-client_auth.html>`_.

Network ports
~~~~~~~~~~~~~

To read data from Greenplum using Spark, following ports should be opened in firewall between Spark and Greenplum:

* Spark driver and all Spark executors -> port ``5432`` on Greenplum master node.

This port number should be set while connecting to Greenplum:

.. code:: python
Greenplum(host="master.host", port=5432, ...)
* Greenplum segments -> some port range (e.g. ``41000-42000``) **listened by Spark executor**.

This range should be set in ``extra`` option:

.. code:: python
Greenplum(
...,
extra={
"server.port": "41000-42000",
},
)
Number of ports in this range is ``number of parallel running Spark sessions`` * ``number of parallel connections per session``.

Number of connections per session (see below) is usually less than ``30`` (see below).

Number of session depends on your environment:
* For ``master=local`` only few ones-tens sessions can be started on the same host, depends on available RAM and CPU.

* For ``master=yarn`` / ``master=k8s`` hundreds or thousands of sessions can be started simultaneously,
but they are executing on different cluster nodes, so one port can be opened on different nodes at the same time.

More details can be found in official documentation:
* `port requirements <https://docs.vmware.com/en/VMware-Greenplum-Connector-for-Apache-Spark/2.1/greenplum-connector-spark/sys_reqs.html#network-port-requirements>`_
* `format of server.port value <https://docs.vmware.com/en/VMware-Greenplum-Connector-for-Apache-Spark/2.1/greenplum-connector-spark/options.html#server.port>`_
* `port troubleshooting <https://docs.vmware.com/en/VMware-Greenplum-Connector-for-Apache-Spark/2.1/greenplum-connector-spark/troubleshooting.html#port-errors>`_

Required grants
~~~~~~~~~~~~~~~

Ask your Greenplum cluster administrator to set following grants for a user,
used for creating a connection:

.. tabs::

.. code-tab:: sql Reading & writing

GRANT USAGE ON SCHEMA myschema TO username;
GRANT CREATE ON SCHEMA myschema TO username;
GRANT SELECT, INSERT ON SCHEMA myschema.mytable TO username;
ALTER USER username CREATEEXTTABLE(type = 'readable', protocol = 'gpfdist') CREATEEXTTABLE(type = 'writable', protocol = 'gpfdist');

.. code-tab:: sql Reading from Greenplum

GRANT USAGE ON SCHEMA schema_to_read TO username;
GRANT CREATE ON SCHEMA schema_to_read TO username;
GRANT SELECT ON SCHEMA schema_to_read.table_to_read TO username;
-- yes, ``writable``, because data is written from Greenplum to Spark executor.
ALTER USER username CREATEEXTTABLE(type = 'writable', protocol = 'gpfdist');

.. code-tab:: sql Writing to Greenplum

GRANT USAGE ON SCHEMA schema_to_write TO username;
GRANT CREATE ON SCHEMA schema_to_write TO username;
GRANT SELECT, INSERT ON SCHEMA schema_to_write.table_to_write TO username;
-- yes, ``readable``, because data is read from Spark executor to Greenplum.
ALTER USER username CREATEEXTTABLE(type = 'readable', protocol = 'gpfdist');

More details can be found in `official documentation <https://docs.vmware.com/en/VMware-Greenplum-Connector-for-Apache-Spark/2.1/greenplum-connector-spark/install_cfg.html#role-privileges>`_.
141 changes: 132 additions & 9 deletions docs/connection/db_connection/greenplum/read.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,143 @@ For reading data from Greenplum, use :obj:`DBReader <onetl.db.db_reader.db_reade
.. note::

Unlike JDBC connectors, *Greenplum connector for Spark* does not support
executing **custom** SQL queries using ``.sql`` method, because this leads to sending
the result through *master* node which is really bad for cluster performance.
executing **custom** SQL queries using ``.sql`` method. Connector can be used to only read some table data
(with filters, if needed) using DBReader.

To make distributed queries like ``JOIN`` **on Greenplum side**, you should create a staging table,
populate it with the data you need (using ``.execute`` method to call ``INSERT INTO ... AS SELECT ...``),
then read the data from this table using :obj:`DBReader <onetl.db.db_reader.db_reader.DBReader>`,
and drop staging table after reading is finished.
Interaction schema
------------------

In this case data will be read directly from Greenplum segment nodes in a distributed way.
High-level schema is described in :ref:`greenplum-prerequisites`. You can find detailed interaction schema below.

.. dropdown:: Spark <-> Greenplum interaction during DBReader.run()

.. plantuml::

@startuml
title Greenplum master <-> Spark driver
box "Spark"
participant "Spark driver"
participant "Spark executor1"
participant "Spark executor2"
participant "Spark executorN"
end box

box "Greenplum"
participant "Greenplum master"
participant "Greenplum segment1"
participant "Greenplum segment2"
participant "Greenplum segmentN"
end box

== Greenplum.check() ==

activate "Spark driver"
"Spark driver" -> "Greenplum master" ++ : CONNECT

"Spark driver" --> "Greenplum master" : CHECK IF TABLE EXISTS gp_table
"Greenplum master" --> "Spark driver" : TABLE EXISTS
"Spark driver" -> "Greenplum master" : SHOW SCHEMA FOR gp_table
"Greenplum master" --> "Spark driver" : (id bigint, col1 int, col2 text, ...)

== DBReader.run() ==

"Spark driver" -> "Spark executor1" ++ : START EXECUTOR FOR df(id bigint, col1 int, col2 text, ...) PARTITION 1
"Spark driver" -> "Spark executor2" ++ : START EXECUTOR FOR df(id bigint, col1 int, col2 text, ...) PARTITION 2
"Spark driver" -> "Spark executorN" ++ : START EXECUTOR FOR df(id bigint, col1 int, col2 text, ...) PARTITION N

note right of "Spark driver" : This is done in parallel,\nexecutors are independent\n|\n|\n|\nV
"Spark executor1" -> "Greenplum master" ++ : CREATE WRITABLE EXTERNAL TABLE spark_executor1 (id bigint, col1 int, col2 text, ...) USING address=executor1_host:executor1_port;\nINSERT INTO EXTERNAL TABLE spark_executor1 FROM gp_table WHERE gp_segment_id = 1
note right of "Greenplum master" : Each white vertical line here is a opened connection to master.\nUsually, **N+1** connections are created from Spark to Greenplum master
"Greenplum master" --> "Greenplum segment1" ++ : SELECT DATA FROM gp_table_data_on_segment1 TO spark_executor1
note right of "Greenplum segment1" : No direct requests between Greenplum segments & Spark.\nData transfer is always initiated by Greenplum segments.

"Spark executor2" -> "Greenplum master" ++ : CREATE WRITABLE EXTERNAL TABLE spark_executor2 (id bigint, col1 int, col2 text, ...) USING address=executor2_host:executor2_port;\nINSERT INTO EXTERNAL TABLE spark_executor2 FROM gp_table WHERE gp_segment_id = 2
"Greenplum master" --> "Greenplum segment2" ++ : SELECT DATA FROM gp_table_data_on_segment2 TO spark_executor2

"Spark executorN" -> "Greenplum master" ++ : CREATE WRITABLE EXTERNAL TABLE spark_executorN (id bigint, col1 int, col2 text, ...) USING address=executorN_host:executorN_port;\nINSERT INTO EXTERNAL TABLE spark_executorN FROM gp_table WHERE gp_segment_id = N
"Greenplum master" --> "Greenplum segmentN" ++ : SELECT DATA FROM gp_table_data_on_segmentN TO spark_executorN

"Greenplum segment1" ->o "Spark executor1" -- : INITIALIZE CONNECTION TO Spark executor1\nPUSH DATA TO Spark executor1
note left of "Spark executor1" : Circle is an open GPFDIST port,\nlistened by executor

"Greenplum segment2" ->o "Spark executor2" -- : INITIALIZE CONNECTION TO Spark executor2\nPUSH DATA TO Spark executor2
"Greenplum segmentN" ->o "Spark executorN" -- : INITIALIZE CONNECTION TO Spark executorN\nPUSH DATA TO Spark executorN

== Spark.stop() ==

"Spark executor1" --> "Greenplum master" : DROP TABLE spark_executor1
deactivate "Greenplum master"
"Spark executor2" --> "Greenplum master" : DROP TABLE spark_executor2
deactivate "Greenplum master"
"Spark executorN" --> "Greenplum master" : DROP TABLE spark_executorN
deactivate "Greenplum master"

"Spark executor1" --> "Spark driver" -- : DONE
"Spark executor2" --> "Spark driver" -- : DONE
"Spark executorN" --> "Spark driver" -- : DONE

"Spark driver" --> "Greenplum master" : CLOSE CONNECTION
deactivate "Greenplum master"
deactivate "Spark driver"
@enduml

Recommendations
---------------

Reading from views
~~~~~~~~~~~~~~~~~~

This connector is **NOT** designed to read data from views.

You can technically read data from a view which has
`gp_segment_id <https://docs.vmware.com/en/VMware-Greenplum-Connector-for-Apache-Spark/2.1/greenplum-connector-spark/troubleshooting.html#reading-from-a-view>`_ column.
But this is **not** recommended because each Spark executor will run the same query, which may lead to running duplicated calculations
and sending data between segments only to skip most of the result and select only small part.

Prefer following option:
* Create staging table to store result data, using :obj:`Greenplum.execute <onetl.connection.db_connection.greenplum.connection.Greenplum.execute>`
* Use the same ``.execute`` method run a query ``INSERT INTO staging_table AS SELECT FROM some_view``. This will be done on Greenplum segments side, query will be run only once.
* Read data from staging table to Spark executor using :obj:`DBReader <onetl.db.db_reader.db_reader.DBReader>`.
* Drop staging table using ``.execute`` method.

Using ``JOIN`` on Greenplum side
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

If you need to get data of joining 2 tables in Greenplum, you should:
* Create staging table to store result data, using ``Greenplum.execute``
* Use the same ``Greenplum.execute`` run a query ``INSERT INTO staging_table AS SELECT FROM table1 JOIN table2``. This will be done on Greenplum segments side, in a distributed way.
* Read data from staging table to Spark executor using ``DBReader``.
* Drop staging table using ``Greenplum.execute``.

.. warning::

Greenplum connection does **NOT** support reading data from views which does not have ``gp_segment_id`` column.
Either add this column to a view, or use stating table solution (see above).
Do **NOT** try to read data from ``table1`` and ``table2`` using ``DBReader``, and then join the resulting dataframes!

This will lead to sending all the data from both tables to Spark executor memory, and then ``JOIN``
will be performed on Spark side, not Greenplum. This is **very** inefficient.

Using ``TEMPORARY`` tables
~~~~~~~~~~~~~~~~~~~~~~~~~~

Someone could think that writing data from ``VIEW`` or result of ``JOIN`` to ``TEMPORARY`` table,
and then passing it to DBReader, is an efficient way to read data from Greenplum, because temp tables are not generating WAL files,
and are automatically deleted after finishing the transaction.

That's will **not** work. Each Spark executor establishes its own connection to Greenplum,
and thus reads its own temporary table, which does not contain any data.

You should use `UNLOGGED <https://docs.vmware.com/en/VMware-Greenplum/7/greenplum-database/ref_guide-sql_commands-CREATE_TABLE.html>`_ tables
to write data to staging table without generating useless WAL logs.

Mapping of Greenplum types to Spark types
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

See `official documentation <https://docs.vmware.com/en/VMware-Greenplum-Connector-for-Apache-Spark/2.1/greenplum-connector-spark/reference-datatype_mapping.html#greenplum-to-spark>`_
for more details.
onETL does not perform any additional casting of types while reading data.

Options
-------

.. currentmodule:: onetl.connection.db_connection.greenplum.options

Expand Down
Loading

0 comments on commit ae6bde6

Please sign in to comment.