diff --git a/airflow/providers/apache/hive/operators/hive_stats.py b/airflow/providers/apache/hive/operators/hive_stats.py index 16b9c6120089f..cb839a351226e 100644 --- a/airflow/providers/apache/hive/operators/hive_stats.py +++ b/airflow/providers/apache/hive/operators/hive_stats.py @@ -33,12 +33,12 @@ class HiveStatsCollectionOperator(BaseOperator): - """ - Gathers partition statistics using a dynamically generated Presto - query, inserts the stats into a MySql table with this format. Stats + """Gathers partition statistics using a dynamically generated Presto query. + + The collected stats are inserted into a MySQL table with this format. Stats overwrite themselves if you rerun the same date/partition. - :: + .. code-block:: sql CREATE TABLE hive_stats ( ds VARCHAR(16), diff --git a/airflow/providers/apache/spark/hooks/spark_submit.py b/airflow/providers/apache/spark/hooks/spark_submit.py index e3fcce73987df..999e3c6b9ee82 100644 --- a/airflow/providers/apache/spark/hooks/spark_submit.py +++ b/airflow/providers/apache/spark/hooks/spark_submit.py @@ -166,10 +166,10 @@ def __init__( self._env: dict[str, Any] | None = None def _resolve_should_track_driver_status(self) -> bool: - """ - Determines whether this hook should poll the spark driver status through - subsequent spark-submit status requests after the initial spark-submit request - :return: if the driver status should be tracked. + """Whether this hook should poll the Spark driver status. + + If this returns True, the hook would send subsequent spark-submit status + requests after the initial spark-submit request. """ return "spark://" in self._connection["master"] and self._connection["deploy_mode"] == "cluster" diff --git a/airflow/providers/asana/operators/asana_tasks.py b/airflow/providers/asana/operators/asana_tasks.py index b848d8d7005df..cb3ec87eea284 100644 --- a/airflow/providers/asana/operators/asana_tasks.py +++ b/airflow/providers/asana/operators/asana_tasks.py @@ -28,8 +28,11 @@ class AsanaCreateTaskOperator(BaseOperator): """ - This operator can be used to create Asana tasks. For more information on - Asana optional task parameters, see https://developers.asana.com/docs/create-a-task. + This operator can be used to create Asana tasks. + + .. seealso:: + For more information on Asana optional task parameters: + https://developers.asana.com/docs/create-a-task .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -67,8 +70,10 @@ def execute(self, context: Context) -> str: class AsanaUpdateTaskOperator(BaseOperator): """ This operator can be used to update Asana tasks. - For more information on Asana optional task parameters, see - https://developers.asana.com/docs/update-a-task. + + .. seealso:: + For more information on Asana optional task parameters: + https://developers.asana.com/docs/update-a-task .. seealso:: For more information on how to use this operator, take a look at the guide: @@ -133,7 +138,10 @@ def execute(self, context: Context) -> None: class AsanaFindTaskOperator(BaseOperator): """ This operator can be used to retrieve Asana tasks that match various filters. - See https://developers.asana.com/docs/update-a-task for a list of possible filters. + + .. seealso:: + For a list of possible filters: + https://developers.asana.com/docs/update-a-task .. seealso:: For more information on how to use this operator, take a look at the guide: diff --git a/airflow/providers/atlassian/jira/operators/jira.py b/airflow/providers/atlassian/jira/operators/jira.py index 8661f70547d43..e5d806b167b4a 100644 --- a/airflow/providers/atlassian/jira/operators/jira.py +++ b/airflow/providers/atlassian/jira/operators/jira.py @@ -27,9 +27,10 @@ class JiraOperator(BaseOperator): - """ - JiraOperator to interact and perform action on Jira issue tracking system. - This operator is designed to use Atlassian Jira SDK: https://atlassian-python-api.readthedocs.io/jira.html. + """JiraOperator to interact and perform action on Jira issue tracking system. + + This operator is designed to use Atlassian Jira SDK. For more information: + https://atlassian-python-api.readthedocs.io/jira.html :param jira_conn_id: Reference to a pre-defined Jira Connection. :param jira_method: Method name from Atlassian Jira Python SDK to be called. diff --git a/airflow/providers/common/sql/hooks/sql.py b/airflow/providers/common/sql/hooks/sql.py index 45d90bce71307..b766e064db950 100644 --- a/airflow/providers/common/sql/hooks/sql.py +++ b/airflow/providers/common/sql/hooks/sql.py @@ -273,10 +273,10 @@ def run( split_statements: bool = False, return_last: bool = True, ) -> Any | list[Any] | None: - """ - Runs a command or a list of commands. Pass a list of sql - statements to the sql parameter to get them to execute - sequentially. + """Run a command or a list of commands. + + Pass a list of SQL statements to the sql parameter to get them to + execute sequentially. The method will return either single query results (typically list of rows) or list of those results where each element in the list are results of one of the queries (typically list of list of rows :D) @@ -392,14 +392,12 @@ def set_autocommit(self, conn, autocommit): conn.autocommit = autocommit def get_autocommit(self, conn) -> bool: - """ - Get autocommit setting for the provided connection. - Return True if conn.autocommit is set to True. - Return False if conn.autocommit is not set or set to False or conn - does not support autocommit. + """Get autocommit setting for the provided connection. :param conn: Connection to get autocommit setting from. - :return: connection autocommit setting. + :return: connection autocommit setting. True if ``autocommit`` is set + to True on the connection. False if it is either not set, set to + False, or the connection does not support auto-commit. """ return getattr(conn, "autocommit", False) and self.supports_autocommit @@ -409,8 +407,8 @@ def get_cursor(self): @classmethod def _generate_insert_sql(cls, table, values, target_fields, replace, **kwargs) -> str: - """ - Helper class method that generates the INSERT SQL statement. + """Helper class method that generates the INSERT SQL statement. + The REPLACE variant is specific to MySQL syntax. :param table: Name of the target table @@ -437,9 +435,10 @@ def _generate_insert_sql(cls, table, values, target_fields, replace, **kwargs) - return sql def insert_rows(self, table, rows, target_fields=None, commit_every=1000, replace=False, **kwargs): - """ - A generic way to insert a set of tuples into a table, - a new transaction is created every commit_every rows. + """Insert a collection of tuples into a table. + + Rows are inserted in chunks, each chunk (of size ``commit_every``) is + done in a new transaction. :param table: Name of the target table :param rows: The rows to insert into the table diff --git a/airflow/providers/common/sql/sensors/sql.py b/airflow/providers/common/sql/sensors/sql.py index 054e9e79b22c3..73505390fc0b8 100644 --- a/airflow/providers/common/sql/sensors/sql.py +++ b/airflow/providers/common/sql/sensors/sql.py @@ -25,14 +25,18 @@ class SqlSensor(BaseSensorOperator): - """ - Runs a sql statement repeatedly until a criteria is met. It will keep trying until - success or failure criteria are met, or if the first cell is not in (0, '0', '', None). - Optional success and failure callables are called with the first cell returned as the argument. - If success callable is defined the sensor will keep retrying until the criteria is met. - If failure callable is defined and the criteria is met the sensor will raise AirflowException. - Failure criteria is evaluated before success criteria. A fail_on_empty boolean can also - be passed to the sensor in which case it will fail if no rows have been returned. + """Run a sql statement repeatedly until a criteria is met. + + This will keep trying until success or failure criteria are met, or if the + first cell is not either ``0``, ``'0'``, ``''``, or ``None``. Optional + success and failure callables are called with the first cell returned as the + argument. + + If success callable is defined, the sensor will keep retrying until the + criteria is met. If failure callable is defined, and the criteria is met, + the sensor will raise AirflowException. Failure criteria is evaluated before + success criteria. A fail_on_empty boolean can also be passed to the sensor + in which case it will fail if no rows have been returned. :param conn_id: The connection to run the sensor against :param sql: The sql to run. To pass, it needs to return at least one cell diff --git a/airflow/providers/databricks/hooks/databricks_sql.py b/airflow/providers/databricks/hooks/databricks_sql.py index 6a09d500c6494..50ff7615914d5 100644 --- a/airflow/providers/databricks/hooks/databricks_sql.py +++ b/airflow/providers/databricks/hooks/databricks_sql.py @@ -31,8 +31,7 @@ class DatabricksSqlHook(BaseDatabricksHook, DbApiHook): - """ - Hook to interact with Databricks SQL. + """Hook to interact with Databricks SQL. :param databricks_conn_id: Reference to the :ref:`Databricks connection `. @@ -148,10 +147,10 @@ def run( split_statements: bool = True, return_last: bool = True, ) -> Any | list[Any] | None: - """ - Runs a command or a list of commands. Pass a list of sql - statements to the sql parameter to get them to execute - sequentially. + """Runs a command or a list of commands. + + Pass a list of SQL statements to the SQL parameter to get them to + execute sequentially. :param sql: the sql statement to be executed (str) or a list of sql statements to execute diff --git a/airflow/providers/dbt/cloud/sensors/dbt.py b/airflow/providers/dbt/cloud/sensors/dbt.py index e87ebd332a83b..5838f6d6247d2 100644 --- a/airflow/providers/dbt/cloud/sensors/dbt.py +++ b/airflow/providers/dbt/cloud/sensors/dbt.py @@ -30,8 +30,7 @@ class DbtCloudJobRunSensor(BaseSensorOperator): - """ - Checks the status of a dbt Cloud job run. + """Checks the status of a dbt Cloud job run. .. seealso:: For more information on how to use this sensor, take a look at the guide: @@ -91,9 +90,11 @@ def poke(self, context: Context) -> bool: return job_run_status == DbtCloudJobRunStatus.SUCCESS.value def execute(self, context: Context) -> None: - """ - Defers to Trigger class to poll for state of the job run until - it reaches a failure state or success state. + """Run the sensor. + + Depending on whether ``deferrable`` is set, this would either defer to + the triggerer or poll for states of the job run, until the job reaches a + failure state or success state. """ if not self.deferrable: super().execute(context) @@ -113,10 +114,10 @@ def execute(self, context: Context) -> None: ) def execute_complete(self, context: Context, event: dict[str, Any]) -> int: - """ - Callback for when the trigger fires - returns immediately. - Relies on trigger to throw an exception, otherwise it assumes execution was - successful. + """Callback for when the trigger fires - returns immediately. + + This relies on trigger to throw an exception, otherwise it assumes + execution was successful. """ if event["status"] in ["error", "cancelled"]: raise AirflowException("Error in dbt: " + event["message"]) @@ -125,10 +126,10 @@ def execute_complete(self, context: Context, event: dict[str, Any]) -> int: class DbtCloudJobRunAsyncSensor(DbtCloudJobRunSensor): - """ - This class is deprecated. - Please use - :class:`airflow.providers.dbt.cloud.sensor.dbt.DbtCloudJobRunSensor`. + """This class is deprecated. + + Please use :class:`airflow.providers.dbt.cloud.sensor.dbt.DbtCloudJobRunSensor` + with ``deferrable=True``. """ def __init__(self, **kwargs: Any) -> None: diff --git a/airflow/providers/dbt/cloud/triggers/dbt.py b/airflow/providers/dbt/cloud/triggers/dbt.py index 1e07080018ff1..eaca3c12f801e 100644 --- a/airflow/providers/dbt/cloud/triggers/dbt.py +++ b/airflow/providers/dbt/cloud/triggers/dbt.py @@ -25,9 +25,9 @@ class DbtCloudRunJobTrigger(BaseTrigger): - """ - DbtCloudRunJobTrigger is triggered with run id and account id, makes async Http call to dbt and - get the status for the submitted job with run id in polling interval of time. + """Trigger to make an HTTP call to dbt and get the status for the job. + + This is done with run id in polling interval of time. :param conn_id: The connection identifier for connecting to Dbt. :param run_id: The ID of a dbt Cloud job. @@ -108,11 +108,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]: yield TriggerEvent({"status": "error", "message": str(e), "run_id": self.run_id}) async def is_still_running(self, hook: DbtCloudHook) -> bool: - """ - Async function to check whether the job is submitted via async API is in - running state and returns True if it is still running else - return False. - """ + """Check whether the submitted job is running.""" job_run_status = await hook.get_job_status(self.run_id, self.account_id) if not DbtCloudJobRunStatus.is_terminal(job_run_status): return True diff --git a/airflow/providers/dingding/hooks/dingding.py b/airflow/providers/dingding/hooks/dingding.py index 2edb7682ccfec..f1d119fe72ae9 100644 --- a/airflow/providers/dingding/hooks/dingding.py +++ b/airflow/providers/dingding/hooks/dingding.py @@ -27,10 +27,10 @@ class DingdingHook(HttpHook): - """ - This hook allows you send Dingding message using Dingding custom bot. - Get Dingding token from conn_id.password. And prefer set domain to - conn_id.host, if not will use default ``https://oapi.dingtalk.com``. + """Send message using a custom Dingding bot. + + Get Dingding token from the connection's ``password``. If ``host`` is not + set in the connection, the default ``https://oapi.dingtalk.com`` is used. For more detail message in `Dingding custom bot `_ @@ -75,11 +75,7 @@ def _get_endpoint(self) -> str: return f"robot/send?access_token={token}" def _build_message(self) -> str: - """ - Build different type of Dingding message - As most commonly used type, text message just need post message content - rather than a dict like ``{'content': 'message'}``. - """ + """Build different type of Dingding messages.""" if self.message_type in ["text", "markdown"]: data = { "msgtype": self.message_type, @@ -91,9 +87,9 @@ def _build_message(self) -> str: return json.dumps(data) def get_conn(self, headers: dict | None = None) -> Session: - """ - Overwrite HttpHook get_conn because just need base_url and headers and - not don't need generic params. + """Overwrite HttpHook get_conn. + + We just need base_url and headers, and not don't need generic params. :param headers: additional headers to be passed through as a dictionary """ diff --git a/airflow/providers/docker/operators/docker.py b/airflow/providers/docker/operators/docker.py index 95abfb50b112e..14e6bef379f43 100644 --- a/airflow/providers/docker/operators/docker.py +++ b/airflow/providers/docker/operators/docker.py @@ -54,8 +54,7 @@ def stringify(line: str | bytes): class DockerOperator(BaseOperator): - """ - Execute a command inside a docker container. + """Execute a command inside a docker container. By default, a temporary directory is created on the host and mounted into a container to allow storing files @@ -432,9 +431,10 @@ def _run_image_with_mounts(self, target_mounts, add_tmp_variable: bool) -> list[ self.cli.remove_container(self.container["Id"], force=True) def _attempt_to_retrieve_result(self): - """ - Attempts to pull the result of the function from the expected file using docker's - get_archive function. If the file is not yet ready, returns None. + """Attempt to pull the result from the expected file. + + This uses Docker's ``get_archive`` function. If the file is not yet + ready, *None* is returned. """ def copy_from_docker(container_id, src): @@ -477,8 +477,10 @@ def execute(self, context: Context) -> list[str] | str | None: @staticmethod def format_command(command: list[str] | str | None) -> list[str] | str | None: - """ - Retrieve command(s). if command string starts with [, it returns the command list). + """Retrieve command(s). + + If command string starts with ``[``, the string is treated as a Python + literal and parsed into a list of commands. :param command: Docker command or entrypoint @@ -498,11 +500,10 @@ def on_kill(self) -> None: @staticmethod def unpack_environment_variables(env_str: str) -> dict: - r""" - Parse environment variables from the string. - - :param env_str: environment variables in key=value format separated by '\n' + r"""Parse environment variables from the string + :param env_str: environment variables in the ``{key}={value}`` format, + separated by a ``\n`` (newline) :return: dictionary containing parsed environment variables """ return dotenv_values(stream=StringIO(env_str)) diff --git a/airflow/providers/elasticsearch/log/es_json_formatter.py b/airflow/providers/elasticsearch/log/es_json_formatter.py index e4b18c4210d7b..0cc204c20814a 100644 --- a/airflow/providers/elasticsearch/log/es_json_formatter.py +++ b/airflow/providers/elasticsearch/log/es_json_formatter.py @@ -22,10 +22,7 @@ class ElasticsearchJSONFormatter(JSONFormatter): - """ - ElasticsearchJSONFormatter instances are used to convert a log record - to json with ISO 8601 date and time format. - """ + """Convert a log record to JSON with ISO 8601 date and time format.""" default_time_format = "%Y-%m-%dT%H:%M:%S" default_msec_format = "%s.%03d" diff --git a/airflow/providers/exasol/hooks/exasol.py b/airflow/providers/exasol/hooks/exasol.py index 63d9bd7810e69..ac45a7915f5cc 100644 --- a/airflow/providers/exasol/hooks/exasol.py +++ b/airflow/providers/exasol/hooks/exasol.py @@ -28,11 +28,12 @@ class ExasolHook(DbApiHook): - """ - Interact with Exasol. + """Interact with Exasol. + You can specify the pyexasol ``compression``, ``encryption``, ``json_lib`` and ``client_name`` parameters in the extra field of your connection as ``{"compression": True, "json_lib": "rapidjson", etc}``. + See `pyexasol reference `_ for more details. @@ -66,13 +67,14 @@ def get_conn(self) -> ExaConnection: return conn def get_pandas_df(self, sql: str, parameters: dict | None = None, **kwargs) -> pd.DataFrame: - """ - Executes the sql and returns a pandas dataframe. + """Execute the SQL and return a Pandas dataframe. - :param sql: the sql statement to be executed (str) or a list of - sql statements to execute + :param sql: The sql statement to be executed (str) or a list of + sql statements to execute. :param parameters: The parameters to render the SQL query with. - :param kwargs: (optional) passed into pyexasol.ExaConnection.export_to_pandas method + + Other keyword arguments are all forwarded into + ``pyexasol.ExaConnection.export_to_pandas``. """ with closing(self.get_conn()) as conn: df = conn.export_to_pandas(sql, query_params=parameters, **kwargs) @@ -83,8 +85,7 @@ def get_records( sql: str | list[str], parameters: Iterable | Mapping | None = None, ) -> list[dict | tuple[Any, ...]]: - """ - Executes the sql and returns a set of records. + """Execute the SQL and return a set of records. :param sql: the sql statement to be executed (str) or a list of sql statements to execute @@ -95,8 +96,7 @@ def get_records( return cur.fetchall() def get_first(self, sql: str | list[str], parameters: Iterable | Mapping | None = None) -> Any: - """ - Executes the sql and returns the first resulting row. + """Execute the SQL and return the first resulting row. :param sql: the sql statement to be executed (str) or a list of sql statements to execute @@ -113,8 +113,7 @@ def export_to_file( query_params: dict | None = None, export_params: dict | None = None, ) -> None: - """ - Exports data to a file. + """Export data to a file. :param filename: Path to the file to which the data has to be exported :param query_or_table: the sql statement to be executed or table name to export @@ -135,10 +134,10 @@ def export_to_file( @staticmethod def get_description(statement: ExaStatement) -> Sequence[Sequence]: - """ - Copied implementation from DB2-API wrapper. + """Copied implementation from DB2-API wrapper. - More info https://github.com/exasol/pyexasol/blob/master/docs/DBAPI_COMPAT.md#db-api-20-wrapper + For more info, see + https://github.com/exasol/pyexasol/blob/master/docs/DBAPI_COMPAT.md#db-api-20-wrapper :param statement: Exasol statement :return: description sequence of t @@ -167,10 +166,10 @@ def run( split_statements: bool = False, return_last: bool = True, ) -> Any | list[Any] | None: - """ - Runs a command or a list of commands. Pass a list of sql - statements to the sql parameter to get them to execute - sequentially. + """Run a command or a list of commands. + + Pass a list of SQL statements to the SQL parameter to get them to + execute sequentially. :param sql: the sql statement to be executed (str) or a list of sql statements to execute @@ -226,8 +225,7 @@ def run( return results def set_autocommit(self, conn, autocommit: bool) -> None: - """ - Sets the autocommit flag on the connection. + """Set the autocommit flag on the connection. :param conn: Connection to set autocommit setting to. :param autocommit: The autocommit setting to set. @@ -240,14 +238,12 @@ def set_autocommit(self, conn, autocommit: bool) -> None: conn.set_autocommit(autocommit) def get_autocommit(self, conn) -> bool: - """ - Get autocommit setting for the provided connection. - Return True if autocommit is set. - Return False if autocommit is not set or set to False or conn - does not support autocommit. + """Get autocommit setting for the provided connection. :param conn: Connection to get autocommit setting from. - :return: connection autocommit setting. + :return: connection autocommit setting. True if ``autocommit`` is set + to True on the connection. False if it is either not set, set to + False, or the connection does not support auto-commit. """ autocommit = conn.attr.get("autocommit") if autocommit is None: @@ -256,8 +252,9 @@ def get_autocommit(self, conn) -> bool: @staticmethod def _serialize_cell(cell, conn=None) -> Any: - """ - Exasol will adapt all arguments to the execute() method internally, + """Override to disable cell serialization. + + Exasol will adapt all arguments to the ``execute()`` method internally, hence we return cell without any conversion. :param cell: The cell to insert into the table @@ -270,12 +267,10 @@ def _serialize_cell(cell, conn=None) -> Any: def exasol_fetch_all_handler(statement: ExaStatement) -> list[tuple] | None: if statement.result_type == "resultSet": return statement.fetchall() - else: - return None + return None def exasol_fetch_one_handler(statement: ExaStatement) -> list[tuple] | None: if statement.result_type == "resultSet": return statement.fetchone() - else: - return None + return None diff --git a/airflow/providers/facebook/ads/hooks/ads.py b/airflow/providers/facebook/ads/hooks/ads.py index 8631c7f8bc726..ccfec93e53a7f 100644 --- a/airflow/providers/facebook/ads/hooks/ads.py +++ b/airflow/providers/facebook/ads/hooks/ads.py @@ -43,8 +43,7 @@ class JobStatus(Enum): class FacebookAdsReportingHook(BaseHook): - """ - Hook for the Facebook Ads API. + """Facebook Ads API. .. seealso:: For more information on the Facebook Ads API, take a look at the API docs: @@ -88,9 +87,10 @@ def multiple_accounts(self) -> bool: @cached_property def facebook_ads_config(self) -> dict: - """ - Gets Facebook ads connection from meta db and sets - facebook_ads_config attribute with returned config file. + """Get the ``facebook_ads_config`` attribute. + + This fetches Facebook Ads connection from meta database, and sets the + ``facebook_ads_config`` attribute with returned config file. """ self.log.info("Fetching fb connection: %s", self.facebook_conn_id) conn = self.get_connection(self.facebook_conn_id) @@ -107,7 +107,7 @@ def bulk_facebook_report( fields: list[str], sleep_time: int = 5, ) -> list[AdsInsights] | dict[str, list[AdsInsights]]: - """Pulls data from the Facebook Ads API regarding Account ID with matching return type. + """Pull data from Facebook Ads API regarding Account ID with matching return type. The return type and value depends on the ``account_id`` configuration. If the configuration is a str representing a single Account ID, the return value is the @@ -152,11 +152,10 @@ def _facebook_report( fields: list[str], sleep_time: int = 5, ) -> list[AdsInsights]: - """ - Pulls data from the Facebook Ads API with given account_id. + """Pull data from the Facebook Ads API with given ``account_id``. :param account_id: Facebook Account ID that holds ads information - https://developers.facebook.com/docs/marketing-api/reference/ads-insights/ + https://developers.facebook.com/docs/marketing-api/reference/ads-insights/ :param api: FacebookAdsApi created in the hook :param fields: List of fields that is obtained from Facebook. Found in AdsInsights.Field class. https://developers.facebook.com/docs/marketing-api/insights/parameters/v6.0 diff --git a/airflow/providers/github/operators/github.py b/airflow/providers/github/operators/github.py index 05e274ab7918d..82c9ab3b77abf 100644 --- a/airflow/providers/github/operators/github.py +++ b/airflow/providers/github/operators/github.py @@ -30,9 +30,9 @@ class GithubOperator(BaseOperator): - """ - GithubOperator to interact and perform action on GitHub API. - This operator is designed to use GitHub Python SDK: https://github.com/PyGithub/PyGithub. + """Interact and perform actions on GitHub API. + + This operator is designed to use GitHub's Python SDK: https://github.com/PyGithub/PyGithub .. seealso:: For more information on how to use this operator, take a look at the guide: diff --git a/airflow/providers/http/hooks/http.py b/airflow/providers/http/hooks/http.py index 2eefa3a3cd67e..5bff4716a9542 100644 --- a/airflow/providers/http/hooks/http.py +++ b/airflow/providers/http/hooks/http.py @@ -36,8 +36,7 @@ class HttpHook(BaseHook): - """ - Interact with HTTP servers. + """Interact with HTTP servers. :param method: the API method to be called :param http_conn_id: :ref:`http connection` that has the base @@ -89,8 +88,7 @@ def auth_type(self, v): # headers may be passed through directly or in the "extra" field in the connection # definition def get_conn(self, headers: dict[Any, Any] | None = None) -> requests.Session: - """ - Returns http session for use with requests. + """Create a Requests HTTP session. :param headers: additional headers to be passed through as a dictionary """ @@ -131,8 +129,7 @@ def run( extra_options: dict[str, Any] | None = None, **request_kwargs: Any, ) -> Any: - r""" - Performs the request. + r"""Perform the request. :param endpoint: the endpoint to be called i.e. resource/v1/query? :param data: payload to be uploaded or request parameters @@ -169,11 +166,11 @@ def run( return self.run_and_check(session, prepped_request, extra_options) def check_response(self, response: requests.Response) -> None: - """ - Checks the status code and raise an AirflowException exception on non 2XX or 3XX - status codes. + """Check the status code and raise on failure. - :param response: A requests response object + :param response: A requests response object. + :raise AirflowException: If the response contains a status code not + in the 2xx and 3xx range. """ try: response.raise_for_status() @@ -188,9 +185,7 @@ def run_and_check( prepped_request: requests.PreparedRequest, extra_options: dict[Any, Any], ) -> Any: - """ - Grabs extra options like timeout and actually runs the request, - checking for the result. + """Grab extra options, actually run the request, and check the result. :param session: the session to be used to execute the request :param prepped_request: the prepared request generated in run() @@ -227,10 +222,10 @@ def run_and_check( raise ex def run_with_advanced_retry(self, _retry_args: dict[Any, Any], *args: Any, **kwargs: Any) -> Any: - """ - Runs Hook.run() with a Tenacity decorator attached to it. This is useful for - connectors which might be disturbed by intermittent issues and should not - instantly fail. + """Run the hook with retry. + + This is useful for connectors which might be disturbed by intermittent + issues and should not instantly fail. :param _retry_args: Arguments which define the retry behaviour. See Tenacity documentation at https://github.com/jd/tenacity @@ -267,8 +262,7 @@ def test_connection(self): class HttpAsyncHook(BaseHook): - """ - Interact with HTTP servers using Python Async. + """Interact with HTTP servers asynchronously. :param method: the API method to be called :param http_conn_id: http connection id that has the base @@ -307,14 +301,14 @@ async def run( headers: dict[str, Any] | None = None, extra_options: dict[str, Any] | None = None, ) -> ClientResponse: - r""" - Performs an asynchronous HTTP request call. + """Perform an asynchronous HTTP request call. - :param endpoint: the endpoint to be called i.e. resource/v1/query? - :param data: payload to be uploaded or request parameters - :param headers: additional headers to be passed through as a dictionary + :param endpoint: Endpoint to be called, i.e. ``resource/v1/query?``. + :param data: Payload to be uploaded or request parameters. + :param headers: Additional headers to be passed through as a dict. :param extra_options: Additional kwargs to pass when creating a request. - For example, ``run(json=obj)`` is passed as ``aiohttp.ClientSession().get(json=obj)`` + For example, ``run(json=obj)`` is passed as + ``aiohttp.ClientSession().get(json=obj)``. """ extra_options = extra_options or {} @@ -399,9 +393,7 @@ async def run( await asyncio.sleep(self.retry_delay) def _retryable_error_async(self, exception: ClientResponseError) -> bool: - """ - Determines whether or not an exception that was thrown might be successful - on a subsequent attempt. + """Determine whether an exception may successful on a subsequent attempt. It considers the following to be retryable: - requests_exceptions.ConnectionError diff --git a/airflow/providers/influxdb/hooks/influxdb.py b/airflow/providers/influxdb/hooks/influxdb.py index 2cbed8cc7f900..a67a2a4666218 100644 --- a/airflow/providers/influxdb/hooks/influxdb.py +++ b/airflow/providers/influxdb/hooks/influxdb.py @@ -35,8 +35,7 @@ class InfluxDBHook(BaseHook): - """ - Interact with InfluxDB. + """Interact with InfluxDB. Performs a connection to InfluxDB and retrieves client. @@ -61,20 +60,13 @@ def get_client(self, uri, token, org_name): return InfluxDBClient(url=uri, token=token, org=org_name) def get_uri(self, conn: Connection): - """ - Function to add additional parameters to the URI - based on SSL or other InfluxDB host requirements. - - """ + """Add additional parameters to the URI based on InfluxDB host requirements.""" conn_scheme = "https" if conn.schema is None else conn.schema conn_port = 7687 if conn.port is None else conn.port return f"{conn_scheme}://{conn.host}:{conn_port}" def get_conn(self) -> InfluxDBClient: - """ - Function that initiates a new InfluxDB connection - with token and organization name. - """ + """Initiate a new InfluxDB connection with token and organization name.""" self.connection = self.get_connection(self.influxdb_conn_id) self.extras = self.connection.extra_dejson.copy() @@ -95,10 +87,9 @@ def get_conn(self) -> InfluxDBClient: return self.client def query(self, query) -> list[FluxTable]: - """ - Function to to run the query. - Note: The bucket name - should be included in the query. + """Run the query. + + Note: The bucket name should be included in the query. :param query: InfluxDB query :return: List @@ -109,11 +100,9 @@ def query(self, query) -> list[FluxTable]: return query_api.query(query) def query_to_df(self, query) -> pd.DataFrame: - """ - Function to run the query and - return a pandas dataframe - Note: The bucket name - should be included in the query. + """Run the query and return a pandas dataframe. + + Note: The bucket name should be included in the query. :param query: InfluxDB query :return: pd.DataFrame @@ -124,9 +113,9 @@ def query_to_df(self, query) -> pd.DataFrame: return query_api.query_data_frame(query) def write(self, bucket_name, point_name, tag_name, tag_value, field_name, field_value, synchronous=False): - """ - Writes a Point to the bucket specified. - Example: Point("my_measurement").tag("location", "Prague").field("temperature", 25.3). + """Write a Point to the bucket specified. + + Example: ``Point("my_measurement").tag("location", "Prague").field("temperature", 25.3)`` """ # By defaults its Batching if synchronous: @@ -139,26 +128,26 @@ def write(self, bucket_name, point_name, tag_name, tag_value, field_name, field_ write_api.write(bucket=bucket_name, record=p) def create_organization(self, name): - """Function to create a new organization.""" + """Create a new organization.""" return self.client.organizations_api().create_organization(name=name) def delete_organization(self, org_id): - """Function to delete organization by organization id.""" + """Delete an organization by ID.""" return self.client.organizations_api().delete_organization(org_id=org_id) def create_bucket(self, bucket_name, description, org_id, retention_rules=None): - """Function to create a bucket for an organization.""" + """Create a bucket for an organization.""" return self.client.buckets_api().create_bucket( bucket_name=bucket_name, description=description, org_id=org_id, retention_rules=None ) def find_bucket_id_by_name(self, bucket_name): - """Function to get bucket id by name.""" + """Get bucket ID by name.""" bucket = self.client.buckets_api().find_bucket_by_name(bucket_name) return "" if bucket is None else bucket.id def delete_bucket(self, bucket_name): - """Function to delete bucket by bucket name.""" + """Delete bucket by name.""" bucket = self.find_bucket_id_by_name(bucket_name) return self.client.buckets_api().delete_bucket(bucket) diff --git a/airflow/providers/jdbc/hooks/jdbc.py b/airflow/providers/jdbc/hooks/jdbc.py index 8959682ccb09d..1d9a3425d6ec0 100644 --- a/airflow/providers/jdbc/hooks/jdbc.py +++ b/airflow/providers/jdbc/hooks/jdbc.py @@ -26,8 +26,7 @@ class JdbcHook(DbApiHook): - """ - General hook for jdbc db access. + """General hook for JDBC access. JDBC URL, username and password will be taken from the predefined connection. Note that the whole JDBC URL must be specified in the "host" field in the DB. @@ -42,7 +41,7 @@ class JdbcHook(DbApiHook): @staticmethod def get_connection_form_widgets() -> dict[str, Any]: - """Returns connection widgets to add to connection form.""" + """Get connection widgets to add to connection form.""" from flask_appbuilder.fieldwidgets import BS3TextFieldWidget from flask_babel import lazy_gettext from wtforms import StringField @@ -54,14 +53,18 @@ def get_connection_form_widgets() -> dict[str, Any]: @staticmethod def get_ui_field_behaviour() -> dict[str, Any]: - """Returns custom field behaviour.""" + """Get custom field behaviour.""" return { "hidden_fields": ["port", "schema", "extra"], "relabeling": {"host": "Connection URL"}, } def _get_field(self, extras: dict, field_name: str): - """Get field from extra, first checking short name, then for backcompat we check for prefixed name.""" + """Get field from extra. + + This first checks the short name, then check for prefixed name for + backward compatibility. + """ backcompat_prefix = "extra__jdbc__" if field_name.startswith("extra__"): raise ValueError( @@ -91,8 +94,7 @@ def get_conn(self) -> jaydebeapi.Connection: return conn def set_autocommit(self, conn: jaydebeapi.Connection, autocommit: bool) -> None: - """ - Enable or disable autocommit for the given connection. + """Set autocommit for the given connection. :param conn: The connection. :param autocommit: The connection's autocommit setting. @@ -100,12 +102,11 @@ def set_autocommit(self, conn: jaydebeapi.Connection, autocommit: bool) -> None: conn.jconn.setAutoCommit(autocommit) def get_autocommit(self, conn: jaydebeapi.Connection) -> bool: - """ - Get autocommit setting for the provided connection. - Return True if conn.autocommit is set to True. - Return False if conn.autocommit is not set or set to False. + """Get autocommit setting for the provided connection. - :param conn: The connection. - :return: connection autocommit setting. + :param conn: Connection to get autocommit setting from. + :return: connection autocommit setting. True if ``autocommit`` is set + to True on the connection. False if it is either not set, set to + False, or the connection does not support auto-commit. """ return conn.jconn.getAutoCommit() diff --git a/airflow/providers/jenkins/operators/jenkins_job_trigger.py b/airflow/providers/jenkins/operators/jenkins_job_trigger.py index fe492b647a7be..5b00007748bba 100644 --- a/airflow/providers/jenkins/operators/jenkins_job_trigger.py +++ b/airflow/providers/jenkins/operators/jenkins_job_trigger.py @@ -37,11 +37,11 @@ def jenkins_request_with_headers(jenkins_server: Jenkins, req: Request) -> JenkinsRequest | None: - """ - We need to get the headers in addition to the body answer - to get the location from them - This function uses jenkins_request method from python-jenkins library - with just the return call changed. + """Create a Jenkins request from a raw request. + + We need to get the headers in addition to the body answer to get the + location from them. This function uses ``jenkins_request`` from + python-jenkins with just the return call changed. :param jenkins_server: The server to query :param req: The request to execute @@ -73,11 +73,11 @@ def jenkins_request_with_headers(jenkins_server: Jenkins, req: Request) -> Jenki class JenkinsJobTriggerOperator(BaseOperator): - """ - Trigger a Jenkins Job and monitor it's execution. - This operator depend on python-jenkins library, - version >= 0.4.15 to communicate with jenkins server. - You'll also need to configure a Jenkins connection in the connections screen. + """Trigger a Jenkins Job and monitor its execution. + + This operator depend on the python-jenkins library version >= 0.4.15 to + communicate with the Jenkins server. You'll also need to configure a Jenkins + connection in the connections screen. :param jenkins_connection_id: The jenkins connection to use for this job :param job_name: The name of the job to trigger @@ -114,11 +114,11 @@ def __init__( self.allowed_jenkins_states = list(allowed_jenkins_states) if allowed_jenkins_states else ["SUCCESS"] def build_job(self, jenkins_server: Jenkins, params: ParamType = None) -> JenkinsRequest | None: - """ - This function makes an API call to Jenkins to trigger a build for 'job_name' - It returned a dict with 2 keys : body and headers. - headers contains also a dict-like object which can be queried to get - the location to poll in the queue. + """Trigger a build job. + + This returns a dict with 2 keys ``body`` and ``headers``. ``headers`` + contains also a dict-like object which can be queried to get the + location to poll in the queue. :param jenkins_server: The jenkins server where the job should be triggered :param params: The parameters block to provide to jenkins API call. @@ -134,15 +134,16 @@ def build_job(self, jenkins_server: Jenkins, params: ParamType = None) -> Jenkin return jenkins_request_with_headers(jenkins_server, request) def poll_job_in_queue(self, location: str, jenkins_server: Jenkins) -> int: - """ - This method poll the jenkins queue until the job is executed. - When we trigger a job through an API call, - the job is first put in the queue without having a build number assigned. - Thus we have to wait the job exit the queue to know its build number. - To do so, we have to add /api/json (or /api/xml) to the location - returned by the build_job call and poll this file. - When a 'executable' block appears in the json, it means the job execution started - and the field 'number' then contains the build number. + """Poll the jenkins queue until the job is executed. + + When we trigger a job through an API call, the job is first put in the + queue without having a build number assigned. We have to wait until the + job exits the queue to know its build number. + + To do so, we add ``/api/json`` (or ``/api/xml``) to the location + returned by the ``build_job`` call, and poll this file. When an + ``executable`` block appears in the response, the job execution would + have started, and the field ``number`` would contains the build number. :param location: Location to poll, returned in the header of the build_job call :param jenkins_server: The jenkins server to poll @@ -184,7 +185,7 @@ def poll_job_in_queue(self, location: str, jenkins_server: Jenkins) -> int: ) def get_hook(self) -> JenkinsHook: - """Instantiate jenkins hook.""" + """Instantiate the Jenkins hook.""" return JenkinsHook(self.jenkins_connection_id) def execute(self, context: Mapping[Any, Any]) -> str | None: diff --git a/airflow/providers/jenkins/sensors/jenkins.py b/airflow/providers/jenkins/sensors/jenkins.py index db9a9a1bf8e6f..39a4cba3de0c8 100644 --- a/airflow/providers/jenkins/sensors/jenkins.py +++ b/airflow/providers/jenkins/sensors/jenkins.py @@ -28,13 +28,10 @@ class JenkinsBuildSensor(BaseSensorOperator): - """ - Monitor a jenkins job and pass when it is finished building. + """Monitor a Jenkins job and pass when it is finished building. This is regardless of the build outcome. - This sensor depend on python-jenkins library. - :param jenkins_connection_id: The jenkins connection to use for this job :param job_name: The name of the job to check :param build_number: Build number to check - if None, the latest build will be used diff --git a/airflow/providers/odbc/hooks/odbc.py b/airflow/providers/odbc/hooks/odbc.py index f643b396e7f3f..72f137ff32a27 100644 --- a/airflow/providers/odbc/hooks/odbc.py +++ b/airflow/providers/odbc/hooks/odbc.py @@ -31,6 +31,15 @@ class OdbcHook(DbApiHook): Interact with odbc data sources using pyodbc. See :doc:`/connections/odbc` for full documentation. + + :param args: passed to DbApiHook + :param database: database to use -- overrides connection ``schema`` + :param driver: name of driver or path to driver. overrides driver supplied in connection ``extra`` + :param dsn: name of DSN to use. overrides DSN supplied in connection ``extra`` + :param connect_kwargs: keyword arguments passed to ``pyodbc.connect`` + :param sqlalchemy_scheme: Scheme sqlalchemy connection. Default is ``mssql+pyodbc`` Only used for + ``get_sqlalchemy_engine`` and ``get_sqlalchemy_connection`` methods. + :param kwargs: passed to DbApiHook """ DEFAULT_SQLALCHEMY_SCHEME = "mssql+pyodbc" @@ -50,16 +59,6 @@ def __init__( sqlalchemy_scheme: str | None = None, **kwargs, ) -> None: - """ - :param args: passed to DbApiHook - :param database: database to use -- overrides connection ``schema`` - :param driver: name of driver or path to driver. overrides driver supplied in connection ``extra`` - :param dsn: name of DSN to use. overrides DSN supplied in connection ``extra`` - :param connect_kwargs: keyword arguments passed to ``pyodbc.connect`` - :param sqlalchemy_scheme: Scheme sqlalchemy connection. Default is ``mssql+pyodbc`` Only used for - ``get_sqlalchemy_engine`` and ``get_sqlalchemy_connection`` methods. - :param kwargs: passed to DbApiHook - """ super().__init__(*args, **kwargs) self._database = database self._driver = driver @@ -71,7 +70,7 @@ def __init__( @property def connection(self): - """``airflow.Connection`` object with connection id ``odbc_conn_id``.""" + """The Connection object with ID ``odbc_conn_id``.""" if not self._connection: self._connection = self.get_connection(getattr(self, self.conn_name_attr)) return self._connection @@ -83,7 +82,7 @@ def database(self) -> str | None: @property def sqlalchemy_scheme(self) -> str: - """Sqlalchemy scheme either from constructor, connection extras or default.""" + """SQLAlchemy scheme either from constructor, connection extras or default.""" return ( self._sqlalchemy_scheme or self.connection_extra_lower.get("sqlalchemy_scheme") @@ -119,10 +118,11 @@ def dsn(self) -> str | None: @property def odbc_connection_string(self): - """ - ODBC connection string - We build connection string instead of using ``pyodbc.connect`` params because, for example, there is - no param representing ``ApplicationIntent=ReadOnly``. Any key-value pairs provided in + """ODBC connection string. + + We build connection string instead of using ``pyodbc.connect`` params + because, for example, there is no param representing + ``ApplicationIntent=ReadOnly``. Any key-value pairs provided in ``Connection.extra`` will be added to the connection string. """ if not self._conn_str: @@ -155,13 +155,14 @@ def odbc_connection_string(self): @property def connect_kwargs(self) -> dict: - """ - Returns effective kwargs to be passed to ``pyodbc.connect`` after merging between conn extra, - ``connect_kwargs`` and hook init. + """Effective kwargs to be passed to ``pyodbc.connect``. - Hook ``connect_kwargs`` precedes ``connect_kwargs`` from conn extra. + The kwargs are merged from connection extra, ``connect_kwargs``, and + the hook's init arguments. Values received to the hook precede those + from the connection. - If ``attrs_before`` provided, keys and values are converted to int, as required by pyodbc. + If ``attrs_before`` is provided, keys and values are converted to int, + as required by pyodbc. """ conn_connect_kwargs = self.connection_extra_lower.get("connect_kwargs", {}) hook_connect_kwargs = self._connect_kwargs or {} @@ -180,10 +181,7 @@ def get_conn(self) -> pyodbc.Connection: return conn def get_uri(self) -> str: - """ - URI invoked in :py:meth:`~airflow.providers.common.sql.hooks.sql.DbApiHook.get_sqlalchemy_engine` - method. - """ + """URI invoked in :meth:`~airflow.providers.common.sql.hooks.sql.DbApiHook.get_sqlalchemy_engine`.""" quoted_conn_str = quote_plus(self.odbc_connection_string) uri = f"{self.sqlalchemy_scheme}:///?odbc_connect={quoted_conn_str}" return uri @@ -191,7 +189,7 @@ def get_uri(self) -> str: def get_sqlalchemy_connection( self, connect_kwargs: dict | None = None, engine_kwargs: dict | None = None ) -> Any: - """Sqlalchemy connection object.""" + """SQLAlchemy connection object.""" engine = self.get_sqlalchemy_engine(engine_kwargs=engine_kwargs) cnx = engine.connect(**(connect_kwargs or {})) return cnx diff --git a/airflow/providers/openlineage/extractors/base.py b/airflow/providers/openlineage/extractors/base.py index beb55f4ea43b4..dd714ea89a446 100644 --- a/airflow/providers/openlineage/extractors/base.py +++ b/airflow/providers/openlineage/extractors/base.py @@ -37,8 +37,7 @@ class OperatorLineage: class BaseExtractor(ABC, LoggingMixin): - """ - Abstract base extractor class. + """Abstract base extractor class. This is used mostly to maintain support for custom extractors. """ @@ -52,9 +51,10 @@ def __init__(self, operator): # type: ignore @classmethod @abstractmethod def get_operator_classnames(cls) -> list[str]: - """ - Implement this method returning list of operators that extractor works for. - There are operators which work very similarly and one extractor can cover both. + """Get a list of operators that extractor works for. + + This is an abstract method that subclasses should implement. There are + operators that work very similarly and one extractor can cover. """ raise NotImplementedError() @@ -74,7 +74,8 @@ class DefaultExtractor(BaseExtractor): @classmethod def get_operator_classnames(cls) -> list[str]: - """ + """Assign this extractor to *no* operators. + Default extractor is chosen not on the classname basis, but by existence of get_openlineage_facets method on operator. """ diff --git a/airflow/providers/openlineage/plugins/facets.py b/airflow/providers/openlineage/plugins/facets.py index 7b4cae7d80d79..f73ff1a98bebd 100644 --- a/airflow/providers/openlineage/plugins/facets.py +++ b/airflow/providers/openlineage/plugins/facets.py @@ -55,9 +55,9 @@ class AirflowRunFacet(BaseFacet): @define(slots=False) class UnknownOperatorInstance(RedactMixin): - """ - Describes an unknown operator - specifies the (class) name of the operator - and its properties. + """Describes an unknown operator. + + This specifies the (class) name of the operator and its properties. """ name: str diff --git a/airflow/providers/oracle/hooks/oracle.py b/airflow/providers/oracle/hooks/oracle.py index 016d54f21e239..38f1878964c4d 100644 --- a/airflow/providers/oracle/hooks/oracle.py +++ b/airflow/providers/oracle/hooks/oracle.py @@ -123,13 +123,12 @@ def __init__( self.fetch_lobs = fetch_lobs def get_conn(self) -> oracledb.Connection: - """ - Returns a oracle connection object - Optional parameters for using a custom DSN connection - (instead of using a server alias from tnsnames.ora) - The dsn (data source name) is the TNS entry - (from the Oracle names server or tnsnames.ora file) - or is a string like the one returned from makedsn(). + """Get an Oracle connection object. + + Optional parameters for using a custom DSN connection (instead of using + a server alias from tnsnames.ora) The dsn (data source name) is the TNS + entry (from the Oracle names server or tnsnames.ora file), or is a + string like the one returned from ``makedsn()``. :param dsn: the data source name for the Oracle server :param service_name: the db_unique_name of the database @@ -262,15 +261,15 @@ def insert_rows( replace: bool | None = False, **kwargs, ) -> None: - """ - A generic way to insert a set of tuples into a table, - the whole set of inserts is treated as one transaction. - Changes from standard DbApiHook implementation: + """Insert a collection of tuples into a table. - - Oracle SQL queries in oracledb can not be terminated with a semicolon (`;`) - - Replace NaN values with NULL using `numpy.nan_to_num` (not using - `is_nan()` because of input types error for strings) - - Coerce datetime cells to Oracle DATETIME format during insert + All data to insert are treated as one transaction. Changes from standard + DbApiHook implementation: + + - Oracle SQL queries can not be terminated with a semicolon (``;``). + - Replace NaN values with NULL using ``numpy.nan_to_num`` (not using + ``is_nan()`` because of input types error for strings). + - Coerce datetime cells to Oracle DATETIME format during insert. :param table: target Oracle table, use dot notation to target a specific database @@ -327,10 +326,10 @@ def bulk_insert_rows( target_fields: list[str] | None = None, commit_every: int = 5000, ): - """ - A performant bulk insert for oracledb - that uses prepared statements via `executemany()`. - For best performance, pass in `rows` as an iterator. + """A performant bulk insert for Oracle DB. + + This uses prepared statements via `executemany()`. For best performance, + pass in `rows` as an iterator. :param table: target Oracle table, use dot notation to target a specific database @@ -382,7 +381,7 @@ def callproc( """ Call the stored procedure identified by the provided string. - Any 'OUT parameters' must be provided with a value of either the + Any OUT parameters must be provided with a value of either the expected Python type (e.g., `int`) or an instance of that type. The return value is a list or mapping that includes parameters in diff --git a/airflow/providers/postgres/hooks/postgres.py b/airflow/providers/postgres/hooks/postgres.py index 5f89f4a5f0e48..0a7aa0732a438 100644 --- a/airflow/providers/postgres/hooks/postgres.py +++ b/airflow/providers/postgres/hooks/postgres.py @@ -37,8 +37,7 @@ class PostgresHook(DbApiHook): - """ - Interact with Postgres. + """Interact with Postgres. You can specify ssl parameters in the extra field of your connection as ``{"sslmode": "require", "sslcert": "/path/to/cert.pem", etc}``. @@ -146,8 +145,8 @@ def get_conn(self) -> connection: return self.conn def copy_expert(self, sql: str, filename: str) -> None: - """ - Executes SQL using psycopg2 copy_expert method. + """Executes SQL using psycopg2's ``copy_expert`` method. + Necessary to execute COPY command without access to a superuser. Note: if this method is called with a "COPY FROM" statement and @@ -169,8 +168,8 @@ def copy_expert(self, sql: str, filename: str) -> None: conn.commit() def get_uri(self) -> str: - """ - Extract the URI from the connection. + """Extract the URI from the connection. + :return: the extracted uri. """ conn = self.get_connection(getattr(self, self.conn_name_attr)) @@ -188,9 +187,10 @@ def bulk_dump(self, table: str, tmp_file: str) -> None: @staticmethod def _serialize_cell(cell: object, conn: connection | None = None) -> Any: - """ - Postgresql will adapt all arguments to the execute() method internally, - hence we return cell without any conversion. + """Serialize a cell. + + PostgreSQL adapts all arguments to the ``execute()`` method internally, + hence we return the cell without any conversion. See http://initd.org/psycopg/docs/advanced.html#adapting-new-types for more information. @@ -202,10 +202,11 @@ def _serialize_cell(cell: object, conn: connection | None = None) -> Any: return cell def get_iam_token(self, conn: Connection) -> tuple[str, str, int]: - """ - Uses AWSHook to retrieve a temporary password to connect to Postgres - or Redshift. Port is required. If none is provided, default is used for - each service. + """Get the IAM token. + + This uses AWSHook to retrieve a temporary password to connect to + Postgres or Redshift. Port is required. If none is provided, the default + 5432 is used. """ try: from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook @@ -242,8 +243,7 @@ def get_iam_token(self, conn: Connection) -> tuple[str, str, int]: return login, token, port def get_table_primary_key(self, table: str, schema: str | None = "public") -> list[str] | None: - """ - Helper method that returns the table primary key. + """Get the table's primary key. :param table: Name of the target table :param schema: Name of the target schema, public by default @@ -267,9 +267,9 @@ def get_table_primary_key(self, table: str, schema: str | None = "public") -> li def _generate_insert_sql( cls, table: str, values: tuple[str, ...], target_fields: Iterable[str], replace: bool, **kwargs ) -> str: - """ - Static helper method that generates the INSERT SQL statement. - The REPLACE variant is specific to PostgreSQL syntax. + """Generate the INSERT SQL statement. + + The REPLACE variant is specific to the PostgreSQL syntax. :param table: Name of the target table :param values: The row to insert into the table diff --git a/airflow/providers/sftp/hooks/sftp.py b/airflow/providers/sftp/hooks/sftp.py index 98d4f190f2b35..f1c3fbc0cc10e 100644 --- a/airflow/providers/sftp/hooks/sftp.py +++ b/airflow/providers/sftp/hooks/sftp.py @@ -32,11 +32,10 @@ class SFTPHook(SSHHook): - """ - This hook is inherited from SSH hook. Please refer to SSH hook for the input - arguments. + """Interact with SFTP. - Interact with SFTP. + This hook inherits the SSH hook. Please refer to SSH hook for the input + arguments. :Pitfalls:: @@ -127,9 +126,10 @@ def close_conn(self) -> None: self.conn = None def describe_directory(self, path: str) -> dict[str, dict[str, str | int | None]]: - """ - Returns a dictionary of {filename: {attributes}} for all files - on the remote system (where the MLSD command is supported). + """Get file information in a directory on the remote system. + + The return format is ``{filename: {attributes}}``. The remote system + support the MLSD command. :param path: full path to the remote directory """ @@ -146,8 +146,7 @@ def describe_directory(self, path: str) -> dict[str, dict[str, str | int | None] return files def list_directory(self, path: str) -> list[str]: - """ - Returns a list of files on the remote system. + """List files in a directory on the remote system. :param path: full path to the remote directory to list """ @@ -156,9 +155,10 @@ def list_directory(self, path: str) -> list[str]: return files def mkdir(self, path: str, mode: int = 0o777) -> None: - """ - Creates a directory on the remote system. - The default mode is 0777, but on some systems, the current umask value is first masked out. + """Create a directory on the remote system. + + The default mode is ``0o777``, but on some systems, the current umask + value may be first masked out. :param path: full path to the remote directory to create :param mode: int permissions of octal mode for directory @@ -167,8 +167,7 @@ def mkdir(self, path: str, mode: int = 0o777) -> None: conn.mkdir(path, mode=mode) def isdir(self, path: str) -> bool: - """ - Checks if the path provided is a directory or not. + """Check if the path provided is a directory. :param path: full path to the remote directory to check """ @@ -180,8 +179,7 @@ def isdir(self, path: str) -> bool: return result def isfile(self, path: str) -> bool: - """ - Checks if the path provided is a file or not. + """Check if the path provided is a file. :param path: full path to the remote file to check """ @@ -193,9 +191,12 @@ def isfile(self, path: str) -> bool: return result def create_directory(self, path: str, mode: int = 0o777) -> None: - """ - Creates a directory on the remote system. - The default mode is 0777, but on some systems, the current umask value is first masked out. + """Create a directory on the remote system. + + The default mode is ``0o777``, but on some systems, the current umask + value may be first masked out. Different from :func:`.mkdir`, this + function attempts to create parent directories if needed, and returns + silently if the target directory already exists. :param path: full path to the remote directory to create :param mode: int permissions of octal mode for directory @@ -215,8 +216,7 @@ def create_directory(self, path: str, mode: int = 0o777) -> None: conn.mkdir(path, mode=mode) def delete_directory(self, path: str) -> None: - """ - Deletes a directory on the remote system. + """Delete a directory on the remote system. :param path: full path to the remote directory to delete """ @@ -224,8 +224,8 @@ def delete_directory(self, path: str) -> None: conn.rmdir(path) def retrieve_file(self, remote_full_path: str, local_full_path: str) -> None: - """ - Transfers the remote file to a local location. + """Transfer the remote file to a local location. + If local_full_path is a string path, the file will be put at that location. @@ -236,8 +236,8 @@ def retrieve_file(self, remote_full_path: str, local_full_path: str) -> None: conn.get(remote_full_path, local_full_path) def store_file(self, remote_full_path: str, local_full_path: str, confirm: bool = True) -> None: - """ - Transfers a local file to the remote location. + """Transfer a local file to the remote location. + If local_full_path_or_buffer is a string path, the file will be read from that location. @@ -248,8 +248,7 @@ def store_file(self, remote_full_path: str, local_full_path: str, confirm: bool conn.put(local_full_path, remote_full_path, confirm=confirm) def delete_file(self, path: str) -> None: - """ - Removes a file on the FTP Server. + """Remove a file on the server. :param path: full path to the remote file """ @@ -257,8 +256,7 @@ def delete_file(self, path: str) -> None: conn.remove(path) def get_mod_time(self, path: str) -> str: - """ - Returns modification time. + """Get an entry's modification time. :param path: full path to the remote file """ @@ -267,8 +265,7 @@ def get_mod_time(self, path: str) -> str: return datetime.datetime.fromtimestamp(ftp_mdtm).strftime("%Y%m%d%H%M%S") # type: ignore def path_exists(self, path: str) -> bool: - """ - Returns True if a remote entity exists. + """Whether a remote entity exists. :param path: full path to the remote file or directory """ @@ -281,8 +278,7 @@ def path_exists(self, path: str) -> bool: @staticmethod def _is_path_match(path: str, prefix: str | None = None, delimiter: str | None = None) -> bool: - """ - Return True if given path starts with prefix (if set) and ends with delimiter (if set). + """Whether given path starts with ``prefix`` (if set) and ends with ``delimiter`` (if set). :param path: path to be checked :param prefix: if set path will be checked is starting with prefix @@ -303,10 +299,10 @@ def walktree( ucallback: Callable[[str], Any | None], recurse: bool = True, ) -> None: - """ - Recursively descend, depth first, the directory tree rooted at - path, calling discrete callback functions for each regular file, - directory and unknown file type. + """Recursively descend, depth first, the directory tree at ``path``. + + This calls discrete callback functions for each regular file, directory, + and unknown file type. :param str path: root of remote directory to descend, use '.' to start at @@ -320,8 +316,6 @@ def walktree( callback function to invoke for an unknown file type. (form: ``func(str)``) :param bool recurse: *Default: True* - should it recurse - - :returns: None """ conn = self.get_conn() for entry in self.list_directory(path): @@ -343,8 +337,8 @@ def walktree( def get_tree_map( self, path: str, prefix: str | None = None, delimiter: str | None = None ) -> tuple[list[str], list[str], list[str]]: - """ - Return tuple with recursive lists of files, directories and unknown paths from given path. + """Get tuple with recursive lists of files, directories and unknown paths. + It is possible to filter results by giving prefix and/or delimiter parameters. :param path: path from which tree will be built @@ -379,8 +373,7 @@ def test_connection(self) -> tuple[bool, str]: return False, str(e) def get_file_by_pattern(self, path, fnmatch_pattern) -> str: - """ - Returning the first matching file based on the given fnmatch type pattern. + """Get the first matching file based on the given fnmatch type pattern. :param path: path to be checked :param fnmatch_pattern: The pattern that will be matched with `fnmatch` @@ -393,8 +386,7 @@ def get_file_by_pattern(self, path, fnmatch_pattern) -> str: return "" def get_files_by_pattern(self, path, fnmatch_pattern) -> list[str]: - """ - Returning the list of matching files based on the given fnmatch type pattern. + """Get all matching files based on the given fnmatch type pattern. :param path: path to be checked :param fnmatch_pattern: The pattern that will be matched with `fnmatch` diff --git a/airflow/providers/slack/operators/slack.py b/airflow/providers/slack/operators/slack.py index 86c03b3c10e25..6d2f27fbb5280 100644 --- a/airflow/providers/slack/operators/slack.py +++ b/airflow/providers/slack/operators/slack.py @@ -29,11 +29,9 @@ class SlackAPIOperator(BaseOperator): - """ - Base Slack Operator. - The SlackAPIPostOperator is derived from this operator. - In the future additional Slack API Operators will be derived from this class as well. - Only one of `slack_conn_id` and `token` is required. + """Base Slack Operator class. + + Only one of ``slack_conn_id`` or ``token`` is required. :param slack_conn_id: :ref:`Slack API Connection ` which its password is Slack API token. Optional @@ -67,15 +65,14 @@ def hook(self) -> SlackHook: return SlackHook(token=self.token, slack_conn_id=self.slack_conn_id) def construct_api_call_params(self) -> Any: - """ - Used by the execute function. Allows templating on the source fields - of the api_call_params dict before construction. - - Override in child classes. - Each SlackAPIOperator child class is responsible for - having a construct_api_call_params function - which sets self.api_call_params with a dict of - API call parameters (https://api.slack.com/methods) + """API call parameters used by the execute function. + + Allows templating on the source fields of the ``api_call_params`` dict + before construction. + + Child classes should override this. Each SlackAPIOperator child class is + responsible for having function set ``self.api_call_params`` with a dict + of API call parameters (https://api.slack.com/methods) """ raise NotImplementedError( "SlackAPIOperator should not be used directly. Chose one of the subclasses instead" @@ -88,15 +85,14 @@ def execute(self, **kwargs): class SlackAPIPostOperator(SlackAPIOperator): - """ - Posts messages to a Slack channel. + """Post messages to a Slack channel. .. code-block:: python slack = SlackAPIPostOperator( task_id="post_hello", dag=dag, - token="XXX", + token="...", text="hello there!", channel="#random", ) @@ -105,11 +101,11 @@ class SlackAPIPostOperator(SlackAPIOperator): ID (C12318391). (templated) :param username: Username that airflow will be posting to Slack as. (templated) :param text: message to send to slack. (templated) - :param icon_url: url to icon used for this message + :param icon_url: URL to icon used for this message :param attachments: extra formatting details. (templated) - - see https://api.slack.com/docs/attachments. + See https://api.slack.com/docs/attachments :param blocks: extra block layouts. (templated) - - see https://api.slack.com/reference/block-kit/blocks. + See https://api.slack.com/reference/block-kit/blocks """ template_fields: Sequence[str] = ("username", "text", "attachments", "blocks", "channel") @@ -119,11 +115,14 @@ def __init__( self, channel: str = "#general", username: str = "Airflow", - text: str = "No message has been set.\n" - "Here is a cat video instead\n" - "https://www.youtube.com/watch?v=J---aiyznGQ", - icon_url: str = "https://raw.githubusercontent.com/apache/" - "airflow/main/airflow/www/static/pin_100.png", + text: str = ( + "No message has been set.\n" + "Here is a cat video instead\n" + "https://www.youtube.com/watch?v=J---aiyznGQ" + ), + icon_url: str = ( + "https://raw.githubusercontent.com/apache/airflow/main/airflow/www/static/pin_100.png" + ), attachments: list | None = None, blocks: list | None = None, **kwargs, @@ -149,8 +148,7 @@ def construct_api_call_params(self) -> Any: class SlackAPIFileOperator(SlackAPIOperator): - """ - Send a file to a Slack channel. + """Send a file to a Slack channel. .. code-block:: python diff --git a/airflow/providers/snowflake/hooks/snowflake.py b/airflow/providers/snowflake/hooks/snowflake.py index 0d03a9ad67020..2a202441a76c9 100644 --- a/airflow/providers/snowflake/hooks/snowflake.py +++ b/airflow/providers/snowflake/hooks/snowflake.py @@ -42,12 +42,9 @@ def _try_to_boolean(value: Any): return value +# TODO: Remove this when provider min airflow version >= 2.5.0 since this is +# handled by provider manager from that version. def _ensure_prefixes(conn_type): - """ - Remove when provider min airflow version >= 2.5.0 since this is handled by - provider manager from that version. - """ - def dec(func): @wraps(func) def inner(): @@ -71,8 +68,7 @@ def _ensure_prefix(field): class SnowflakeHook(DbApiHook): - """ - A client to interact with Snowflake. + """A client to interact with Snowflake. This hook requires the snowflake_conn_id connection. The snowflake account, login, and, password field must be setup in the connection. Other inputs can be defined @@ -201,9 +197,9 @@ def _get_field(self, extra_dict, field_name): return extra_dict.get(backcompat_key) or None def _get_conn_params(self) -> dict[str, str | None]: - """ - One method to fetch connection params as a dict - used in get_uri() and get_connection(). + """Fetch connection params as a dict. + + This is used in ``get_uri()`` and ``get_connection()``. """ conn = self.get_connection(self.snowflake_conn_id) # type: ignore[attr-defined] extra_dict = conn.extra_dejson @@ -297,8 +293,7 @@ def get_conn(self) -> SnowflakeConnection: return conn def get_sqlalchemy_engine(self, engine_kwargs=None): - """ - Get an sqlalchemy_engine object. + """Get an sqlalchemy_engine object. :param engine_kwargs: Kwargs used in :func:`~sqlalchemy.create_engine`. :return: the created engine. @@ -331,26 +326,30 @@ def run( return_last: bool = True, return_dictionaries: bool = False, ) -> Any | list[Any] | None: - """ - Runs a command or a list of commands. Pass a list of sql - statements to the sql parameter to get them to execute - sequentially. The variable execution_info is returned so that - it can be used in the Operators to modify the behavior - depending on the result of the query (i.e fail the operator - if the copy has processed 0 files). - - :param sql: the sql string to be executed with possibly multiple statements, - or a list of sql statements to execute + """Runs a command or a list of commands. + + Pass a list of SQL statements to the SQL parameter to get them to + execute sequentially. The variable ``execution_info`` is returned so + that it can be used in the Operators to modify the behavior depending on + the result of the query (i.e fail the operator if the copy has processed + 0 files). + + :param sql: The SQL string to be executed with possibly multiple + statements, or a list of sql statements to execute :param autocommit: What to set the connection's autocommit setting to before executing the query. :param parameters: The parameters to render the SQL query with. - :param handler: The result handler which is called with the result of each statement. - :param split_statements: Whether to split a single SQL string into statements and run separately - :param return_last: Whether to return result for only last statement or for all after split - :param return_dictionaries: Whether to return dictionaries rather than regular DBApi sequences - as rows in the result. The dictionaries are of form: - ``{ 'column1_name': value1, 'column2_name': value2 ... }``. - :return: return only result of the LAST SQL expression if handler was provided. + :param handler: The result handler which is called with the result of + each statement. + :param split_statements: Whether to split a single SQL string into + statements and run separately + :param return_last: Whether to return result for only last statement or + for all after split. + :param return_dictionaries: Whether to return dictionaries rather than + regular DBAPI sequences as rows in the result. The dictionaries are + of form ``{ 'column1_name': value1, 'column2_name': value2 ... }``. + :return: Result of the last SQL statement if *handler* is set. + *None* otherwise. """ self.query_ids = [] diff --git a/airflow/providers/ssh/hooks/ssh.py b/airflow/providers/ssh/hooks/ssh.py index 14dd76452850f..1db70b4b12f7d 100644 --- a/airflow/providers/ssh/hooks/ssh.py +++ b/airflow/providers/ssh/hooks/ssh.py @@ -41,9 +41,10 @@ class SSHHook(BaseHook): - """ - Hook for ssh remote execution using Paramiko. - ref: https://github.com/paramiko/paramiko + """Execute remote commands with Paramiko. + + .. seealso:: https://github.com/paramiko/paramiko + This hook also lets you create ssh tunnel and serve as basis for SFTP file transfer. :param ssh_conn_id: :ref:`ssh connection id` from airflow @@ -281,7 +282,7 @@ def host_proxy(self) -> paramiko.ProxyCommand | None: return paramiko.ProxyCommand(cmd) if cmd else None def get_conn(self) -> paramiko.SSHClient: - """Opens a ssh connection to the remote host.""" + """Opens an SSH connection to the remote host.""" self.log.debug("Creating SSH client for conn_id: %s", self.ssh_conn_id) client = paramiko.SSHClient() @@ -380,8 +381,9 @@ def __exit__(self, exc_type, exc_val, exc_tb) -> None: def get_tunnel( self, remote_port: int, remote_host: str = "localhost", local_port: int | None = None ) -> SSHTunnelForwarder: - """ - Creates a tunnel between two hosts. Like ssh -L :host:. + """Create a tunnel between two hosts. + + This is conceptually similar to ``ssh -L :host:``. :param remote_port: The remote port to create a tunnel to :param remote_host: The remote host to create a tunnel to (default localhost) @@ -421,13 +423,11 @@ def get_tunnel( def create_tunnel( self, local_port: int, remote_port: int, remote_host: str = "localhost" ) -> SSHTunnelForwarder: - """ - Creates tunnel for SSH connection [Deprecated]. + """Create a tunnel for SSH connection [Deprecated]. :param local_port: local port number :param remote_port: remote port number :param remote_host: remote host - :return: """ warnings.warn( "SSHHook.create_tunnel is deprecated, Please" @@ -440,8 +440,7 @@ def create_tunnel( return self.get_tunnel(remote_port, remote_host, local_port) def _pkey_from_private_key(self, private_key: str, passphrase: str | None = None) -> paramiko.PKey: - """ - Creates appropriate paramiko key for given private key. + """Create an appropriate Paramiko key for a given private key. :param private_key: string containing private key :return: ``paramiko.PKey`` appropriate for given key