Skip to content

Commit

Permalink
Improve docstrings in providers (#31681)
Browse files Browse the repository at this point in the history
  • Loading branch information
uranusjr committed Jun 4, 2023
1 parent 340f706 commit 9276310
Show file tree
Hide file tree
Showing 29 changed files with 384 additions and 422 deletions.
8 changes: 4 additions & 4 deletions airflow/providers/apache/hive/operators/hive_stats.py
Expand Up @@ -33,12 +33,12 @@


class HiveStatsCollectionOperator(BaseOperator):
"""
Gathers partition statistics using a dynamically generated Presto
query, inserts the stats into a MySql table with this format. Stats
"""Gathers partition statistics using a dynamically generated Presto query.
The collected stats are inserted into a MySQL table with this format. Stats
overwrite themselves if you rerun the same date/partition.
::
.. code-block:: sql
CREATE TABLE hive_stats (
ds VARCHAR(16),
Expand Down
8 changes: 4 additions & 4 deletions airflow/providers/apache/spark/hooks/spark_submit.py
Expand Up @@ -166,10 +166,10 @@ def __init__(
self._env: dict[str, Any] | None = None

def _resolve_should_track_driver_status(self) -> bool:
"""
Determines whether this hook should poll the spark driver status through
subsequent spark-submit status requests after the initial spark-submit request
:return: if the driver status should be tracked.
"""Whether this hook should poll the Spark driver status.
If this returns True, the hook would send subsequent spark-submit status
requests after the initial spark-submit request.
"""
return "spark://" in self._connection["master"] and self._connection["deploy_mode"] == "cluster"

Expand Down
18 changes: 13 additions & 5 deletions airflow/providers/asana/operators/asana_tasks.py
Expand Up @@ -28,8 +28,11 @@

class AsanaCreateTaskOperator(BaseOperator):
"""
This operator can be used to create Asana tasks. For more information on
Asana optional task parameters, see https://developers.asana.com/docs/create-a-task.
This operator can be used to create Asana tasks.
.. seealso::
For more information on Asana optional task parameters:
https://developers.asana.com/docs/create-a-task
.. seealso::
For more information on how to use this operator, take a look at the guide:
Expand Down Expand Up @@ -67,8 +70,10 @@ def execute(self, context: Context) -> str:
class AsanaUpdateTaskOperator(BaseOperator):
"""
This operator can be used to update Asana tasks.
For more information on Asana optional task parameters, see
https://developers.asana.com/docs/update-a-task.
.. seealso::
For more information on Asana optional task parameters:
https://developers.asana.com/docs/update-a-task
.. seealso::
For more information on how to use this operator, take a look at the guide:
Expand Down Expand Up @@ -133,7 +138,10 @@ def execute(self, context: Context) -> None:
class AsanaFindTaskOperator(BaseOperator):
"""
This operator can be used to retrieve Asana tasks that match various filters.
See https://developers.asana.com/docs/update-a-task for a list of possible filters.
.. seealso::
For a list of possible filters:
https://developers.asana.com/docs/update-a-task
.. seealso::
For more information on how to use this operator, take a look at the guide:
Expand Down
7 changes: 4 additions & 3 deletions airflow/providers/atlassian/jira/operators/jira.py
Expand Up @@ -27,9 +27,10 @@


class JiraOperator(BaseOperator):
"""
JiraOperator to interact and perform action on Jira issue tracking system.
This operator is designed to use Atlassian Jira SDK: https://atlassian-python-api.readthedocs.io/jira.html.
"""JiraOperator to interact and perform action on Jira issue tracking system.
This operator is designed to use Atlassian Jira SDK. For more information:
https://atlassian-python-api.readthedocs.io/jira.html
:param jira_conn_id: Reference to a pre-defined Jira Connection.
:param jira_method: Method name from Atlassian Jira Python SDK to be called.
Expand Down
29 changes: 14 additions & 15 deletions airflow/providers/common/sql/hooks/sql.py
Expand Up @@ -273,10 +273,10 @@ def run(
split_statements: bool = False,
return_last: bool = True,
) -> Any | list[Any] | None:
"""
Runs a command or a list of commands. Pass a list of sql
statements to the sql parameter to get them to execute
sequentially.
"""Run a command or a list of commands.
Pass a list of SQL statements to the sql parameter to get them to
execute sequentially.
The method will return either single query results (typically list of rows) or list of those results
where each element in the list are results of one of the queries (typically list of list of rows :D)
Expand Down Expand Up @@ -392,14 +392,12 @@ def set_autocommit(self, conn, autocommit):
conn.autocommit = autocommit

def get_autocommit(self, conn) -> bool:
"""
Get autocommit setting for the provided connection.
Return True if conn.autocommit is set to True.
Return False if conn.autocommit is not set or set to False or conn
does not support autocommit.
"""Get autocommit setting for the provided connection.
:param conn: Connection to get autocommit setting from.
:return: connection autocommit setting.
:return: connection autocommit setting. True if ``autocommit`` is set
to True on the connection. False if it is either not set, set to
False, or the connection does not support auto-commit.
"""
return getattr(conn, "autocommit", False) and self.supports_autocommit

Expand All @@ -409,8 +407,8 @@ def get_cursor(self):

@classmethod
def _generate_insert_sql(cls, table, values, target_fields, replace, **kwargs) -> str:
"""
Helper class method that generates the INSERT SQL statement.
"""Helper class method that generates the INSERT SQL statement.
The REPLACE variant is specific to MySQL syntax.
:param table: Name of the target table
Expand All @@ -437,9 +435,10 @@ def _generate_insert_sql(cls, table, values, target_fields, replace, **kwargs) -
return sql

def insert_rows(self, table, rows, target_fields=None, commit_every=1000, replace=False, **kwargs):
"""
A generic way to insert a set of tuples into a table,
a new transaction is created every commit_every rows.
"""Insert a collection of tuples into a table.
Rows are inserted in chunks, each chunk (of size ``commit_every``) is
done in a new transaction.
:param table: Name of the target table
:param rows: The rows to insert into the table
Expand Down
20 changes: 12 additions & 8 deletions airflow/providers/common/sql/sensors/sql.py
Expand Up @@ -25,14 +25,18 @@


class SqlSensor(BaseSensorOperator):
"""
Runs a sql statement repeatedly until a criteria is met. It will keep trying until
success or failure criteria are met, or if the first cell is not in (0, '0', '', None).
Optional success and failure callables are called with the first cell returned as the argument.
If success callable is defined the sensor will keep retrying until the criteria is met.
If failure callable is defined and the criteria is met the sensor will raise AirflowException.
Failure criteria is evaluated before success criteria. A fail_on_empty boolean can also
be passed to the sensor in which case it will fail if no rows have been returned.
"""Run a sql statement repeatedly until a criteria is met.
This will keep trying until success or failure criteria are met, or if the
first cell is not either ``0``, ``'0'``, ``''``, or ``None``. Optional
success and failure callables are called with the first cell returned as the
argument.
If success callable is defined, the sensor will keep retrying until the
criteria is met. If failure callable is defined, and the criteria is met,
the sensor will raise AirflowException. Failure criteria is evaluated before
success criteria. A fail_on_empty boolean can also be passed to the sensor
in which case it will fail if no rows have been returned.
:param conn_id: The connection to run the sensor against
:param sql: The sql to run. To pass, it needs to return at least one cell
Expand Down
11 changes: 5 additions & 6 deletions airflow/providers/databricks/hooks/databricks_sql.py
Expand Up @@ -31,8 +31,7 @@


class DatabricksSqlHook(BaseDatabricksHook, DbApiHook):
"""
Hook to interact with Databricks SQL.
"""Hook to interact with Databricks SQL.
:param databricks_conn_id: Reference to the
:ref:`Databricks connection <howto/connection:databricks>`.
Expand Down Expand Up @@ -148,10 +147,10 @@ def run(
split_statements: bool = True,
return_last: bool = True,
) -> Any | list[Any] | None:
"""
Runs a command or a list of commands. Pass a list of sql
statements to the sql parameter to get them to execute
sequentially.
"""Runs a command or a list of commands.
Pass a list of SQL statements to the SQL parameter to get them to
execute sequentially.
:param sql: the sql statement to be executed (str) or a list of
sql statements to execute
Expand Down
27 changes: 14 additions & 13 deletions airflow/providers/dbt/cloud/sensors/dbt.py
Expand Up @@ -30,8 +30,7 @@


class DbtCloudJobRunSensor(BaseSensorOperator):
"""
Checks the status of a dbt Cloud job run.
"""Checks the status of a dbt Cloud job run.
.. seealso::
For more information on how to use this sensor, take a look at the guide:
Expand Down Expand Up @@ -91,9 +90,11 @@ def poke(self, context: Context) -> bool:
return job_run_status == DbtCloudJobRunStatus.SUCCESS.value

def execute(self, context: Context) -> None:
"""
Defers to Trigger class to poll for state of the job run until
it reaches a failure state or success state.
"""Run the sensor.
Depending on whether ``deferrable`` is set, this would either defer to
the triggerer or poll for states of the job run, until the job reaches a
failure state or success state.
"""
if not self.deferrable:
super().execute(context)
Expand All @@ -113,10 +114,10 @@ def execute(self, context: Context) -> None:
)

def execute_complete(self, context: Context, event: dict[str, Any]) -> int:
"""
Callback for when the trigger fires - returns immediately.
Relies on trigger to throw an exception, otherwise it assumes execution was
successful.
"""Callback for when the trigger fires - returns immediately.
This relies on trigger to throw an exception, otherwise it assumes
execution was successful.
"""
if event["status"] in ["error", "cancelled"]:
raise AirflowException("Error in dbt: " + event["message"])
Expand All @@ -125,10 +126,10 @@ def execute_complete(self, context: Context, event: dict[str, Any]) -> int:


class DbtCloudJobRunAsyncSensor(DbtCloudJobRunSensor):
"""
This class is deprecated.
Please use
:class:`airflow.providers.dbt.cloud.sensor.dbt.DbtCloudJobRunSensor`.
"""This class is deprecated.
Please use :class:`airflow.providers.dbt.cloud.sensor.dbt.DbtCloudJobRunSensor`
with ``deferrable=True``.
"""

def __init__(self, **kwargs: Any) -> None:
Expand Down
12 changes: 4 additions & 8 deletions airflow/providers/dbt/cloud/triggers/dbt.py
Expand Up @@ -25,9 +25,9 @@


class DbtCloudRunJobTrigger(BaseTrigger):
"""
DbtCloudRunJobTrigger is triggered with run id and account id, makes async Http call to dbt and
get the status for the submitted job with run id in polling interval of time.
"""Trigger to make an HTTP call to dbt and get the status for the job.
This is done with run id in polling interval of time.
:param conn_id: The connection identifier for connecting to Dbt.
:param run_id: The ID of a dbt Cloud job.
Expand Down Expand Up @@ -108,11 +108,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
yield TriggerEvent({"status": "error", "message": str(e), "run_id": self.run_id})

async def is_still_running(self, hook: DbtCloudHook) -> bool:
"""
Async function to check whether the job is submitted via async API is in
running state and returns True if it is still running else
return False.
"""
"""Check whether the submitted job is running."""
job_run_status = await hook.get_job_status(self.run_id, self.account_id)
if not DbtCloudJobRunStatus.is_terminal(job_run_status):
return True
Expand Down
20 changes: 8 additions & 12 deletions airflow/providers/dingding/hooks/dingding.py
Expand Up @@ -27,10 +27,10 @@


class DingdingHook(HttpHook):
"""
This hook allows you send Dingding message using Dingding custom bot.
Get Dingding token from conn_id.password. And prefer set domain to
conn_id.host, if not will use default ``https://oapi.dingtalk.com``.
"""Send message using a custom Dingding bot.
Get Dingding token from the connection's ``password``. If ``host`` is not
set in the connection, the default ``https://oapi.dingtalk.com`` is used.
For more detail message in
`Dingding custom bot <https://open-doc.dingtalk.com/microapp/serverapi2/qf2nxq>`_
Expand Down Expand Up @@ -75,11 +75,7 @@ def _get_endpoint(self) -> str:
return f"robot/send?access_token={token}"

def _build_message(self) -> str:
"""
Build different type of Dingding message
As most commonly used type, text message just need post message content
rather than a dict like ``{'content': 'message'}``.
"""
"""Build different type of Dingding messages."""
if self.message_type in ["text", "markdown"]:
data = {
"msgtype": self.message_type,
Expand All @@ -91,9 +87,9 @@ def _build_message(self) -> str:
return json.dumps(data)

def get_conn(self, headers: dict | None = None) -> Session:
"""
Overwrite HttpHook get_conn because just need base_url and headers and
not don't need generic params.
"""Overwrite HttpHook get_conn.
We just need base_url and headers, and not don't need generic params.
:param headers: additional headers to be passed through as a dictionary
"""
Expand Down
23 changes: 12 additions & 11 deletions airflow/providers/docker/operators/docker.py
Expand Up @@ -54,8 +54,7 @@ def stringify(line: str | bytes):


class DockerOperator(BaseOperator):
"""
Execute a command inside a docker container.
"""Execute a command inside a docker container.
By default, a temporary directory is
created on the host and mounted into a container to allow storing files
Expand Down Expand Up @@ -432,9 +431,10 @@ def _run_image_with_mounts(self, target_mounts, add_tmp_variable: bool) -> list[
self.cli.remove_container(self.container["Id"], force=True)

def _attempt_to_retrieve_result(self):
"""
Attempts to pull the result of the function from the expected file using docker's
get_archive function. If the file is not yet ready, returns None.
"""Attempt to pull the result from the expected file.
This uses Docker's ``get_archive`` function. If the file is not yet
ready, *None* is returned.
"""

def copy_from_docker(container_id, src):
Expand Down Expand Up @@ -477,8 +477,10 @@ def execute(self, context: Context) -> list[str] | str | None:

@staticmethod
def format_command(command: list[str] | str | None) -> list[str] | str | None:
"""
Retrieve command(s). if command string starts with [, it returns the command list).
"""Retrieve command(s).
If command string starts with ``[``, the string is treated as a Python
literal and parsed into a list of commands.
:param command: Docker command or entrypoint
Expand All @@ -498,11 +500,10 @@ def on_kill(self) -> None:

@staticmethod
def unpack_environment_variables(env_str: str) -> dict:
r"""
Parse environment variables from the string.
:param env_str: environment variables in key=value format separated by '\n'
r"""Parse environment variables from the string
:param env_str: environment variables in the ``{key}={value}`` format,
separated by a ``\n`` (newline)
:return: dictionary containing parsed environment variables
"""
return dotenv_values(stream=StringIO(env_str))
5 changes: 1 addition & 4 deletions airflow/providers/elasticsearch/log/es_json_formatter.py
Expand Up @@ -22,10 +22,7 @@


class ElasticsearchJSONFormatter(JSONFormatter):
"""
ElasticsearchJSONFormatter instances are used to convert a log record
to json with ISO 8601 date and time format.
"""
"""Convert a log record to JSON with ISO 8601 date and time format."""

default_time_format = "%Y-%m-%dT%H:%M:%S"
default_msec_format = "%s.%03d"
Expand Down

0 comments on commit 9276310

Please sign in to comment.