Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions docs/apache-airflow-providers-google/connections/gcp_sql.rst
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,40 @@ Extra (optional)
.. code-block:: bash

export AIRFLOW_CONN_GOOGLE_CLOUD_SQL_DEFAULT='gcpcloudsql://user:XXXXXXXXX@1.1.1.1:3306/mydb?database_type=mysql&project_id=example-project&location=europe-west1&instance=testinstance&use_proxy=True&sql_proxy_use_tcp=False'

Configuring and using IAM authentication
----------------------------------------

.. warning::
This functionality requires ``gcloud`` command (Google Cloud SDK) must be `installed
<https://cloud.google.com/sdk/docs/install>`_ on the Airflow worker.

.. warning::
IAM authentication working only for Google Service Accounts.

Configure Service Accounts on Google Cloud IAM side
"""""""""""""""""""""""""""""""""""""""""""""""""""

For connecting via IAM you need to use Service Account. It can be the same service account which you use for
the ``gcloud`` authentication or an another account. If you decide to use a different account then this
account should be impersonated from the account which used for ``gcloud`` authentication and granted
a ``Service Account Token Creator`` role. More information how to grant a role `here
<https://cloud.google.com/iam/docs/manage-access-service-accounts?hl=en&_gl=1*3bsv5i*_ga*NDY4NDIyNTcxLjE3MjkxNzQ4MTM.*_ga_WH2QY8WWF5*MTcyOTE5MzU1OS4yLjEuMTcyOTE5NTM0My4wLjAuMA..#single-role>`_.

Also the Service Account should be configured for working with IAM.
Here are links describing what should be done before the start: `PostgreSQL
<https://cloud.google.com/sql/docs/postgres/iam-logins#before_you_begin>`_ and `MySQL
<https://cloud.google.com/sql/docs/mysql/iam-logins#before_you_begin>`_.

Configure ``gcpcloudsql`` connection with IAM enabling
""""""""""""""""""""""""""""""""""""""""""""""""""""""

For using IAM you need to enable ``"use_iam": "True"`` in the ``extra`` field. And specify IAM account in this format
``USERNAME@PROJECT_ID.iam.gserviceaccount.com`` in ``login`` field and empty string in the ``password`` field.

For example:

.. exampleinclude:: /../../providers/tests/system/google/cloud/cloud_sql/example_cloud_sql_query_iam.py
:language: python
:start-after: [START howto_operator_cloudsql_iam_connections]
:end-before: [END howto_operator_cloudsql_iam_connections]
41 changes: 39 additions & 2 deletions providers/src/airflow/providers/google/cloud/hooks/cloud_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import platform
import random
import re
import shlex
import shutil
import socket
import string
Expand Down Expand Up @@ -777,6 +778,8 @@ class CloudSQLDatabaseHook(BaseHook):
SQL DB.
* **use_ssl** - (default False) Whether SSL should be used to connect to Cloud SQL DB.
You cannot use proxy and SSL together.
* **use_iam** - (default False) Whether IAM should be used to connect to Cloud SQL DB.
With using IAM password field should be empty string.
* **sql_proxy_use_tcp** - (default False) If set to true, TCP is used to connect via
proxy, otherwise UNIX sockets are used.
* **sql_proxy_version** - Specific version of the proxy to download (for example
Expand Down Expand Up @@ -839,11 +842,16 @@ def __init__(
self.database_type = self.extras.get("database_type")
self.use_proxy = self._get_bool(self.extras.get("use_proxy", "False"))
self.use_ssl = self._get_bool(self.extras.get("use_ssl", "False"))
self.use_iam = self._get_bool(self.extras.get("use_iam", "False"))
self.sql_proxy_use_tcp = self._get_bool(self.extras.get("sql_proxy_use_tcp", "False"))
self.sql_proxy_version = self.extras.get("sql_proxy_version")
self.sql_proxy_binary_path = sql_proxy_binary_path
self.user = self.cloudsql_connection.login
self.password = self.cloudsql_connection.password
if self.use_iam:
self.user = self._get_iam_db_login()
self.password = self._generate_login_token(service_account=self.cloudsql_connection.login)
else:
self.user = self.cloudsql_connection.login
self.password = self.cloudsql_connection.password
self.public_ip = self.cloudsql_connection.host
self.public_port = self.cloudsql_connection.port
self.ssl_cert = ssl_cert
Expand Down Expand Up @@ -1187,3 +1195,32 @@ def free_reserved_port(self) -> None:
if self.reserved_tcp_socket:
self.reserved_tcp_socket.close()
self.reserved_tcp_socket = None

def _get_iam_db_login(self) -> str:
"""Get an IAM login for Cloud SQL database."""
if not self.cloudsql_connection.login:
raise AirflowException("The login parameter needs to be set in connection")

if self.database_type == "postgres":
return self.cloudsql_connection.login.split(".gserviceaccount.com")[0]
else:
return self.cloudsql_connection.login.split("@")[0]

def _generate_login_token(self, service_account) -> str:
"""Generate an IAM login token for Cloud SQL and return the token."""
cmd = ["gcloud", "sql", "generate-login-token", f"--impersonate-service-account={service_account}"]
self.log.info("Executing command: %s", " ".join(shlex.quote(c) for c in cmd))
cloud_sql_hook = CloudSQLHook(api_version="v1", gcp_conn_id=self.gcp_conn_id)

with cloud_sql_hook.provide_authorized_gcloud():
proc = subprocess.run(cmd, capture_output=True)

if proc.returncode != 0:
stderr_last_20_lines = "\n".join(proc.stderr.decode().strip().splitlines()[-20:])
raise AirflowException(
f"Process exited with non-zero exit code. Exit code: {proc.returncode}. Error Details: "
f"{stderr_last_20_lines}"
)

auth_token = proc.stdout.decode().strip()
return auth_token
Loading