diff --git a/airflow/providers/google/cloud/transfers/cassandra_to_gcs.py b/airflow/providers/google/cloud/transfers/cassandra_to_gcs.py index 8c53b929f47fa..bb8dc6a4198d0 100644 --- a/airflow/providers/google/cloud/transfers/cassandra_to_gcs.py +++ b/airflow/providers/google/cloud/transfers/cassandra_to_gcs.py @@ -218,7 +218,7 @@ def _write_local_data_files(self, cursor): def _write_local_schema_file(self, cursor): """ - Takes a cursor, and writes the BigQuery schema for the results to a local file system. + Take a cursor, and writes the BigQuery schema for the results to a local file system. :return: A dictionary where key is a filename to be used as an object name in GCS, and values are file handles to local files that @@ -253,7 +253,7 @@ def _upload_to_gcs(self, file_to_upload): ) def generate_data_dict(self, names: Iterable[str], values: Any) -> dict[str, Any]: - """Generates data structure that will be stored as file in GCS.""" + """Generate data structure that will be stored as file in GCS.""" return {n: self.convert_value(v) for n, v in zip(names, values)} def convert_value(self, value: Any | None) -> Any | None: @@ -285,12 +285,12 @@ def convert_value(self, value: Any | None) -> Any | None: raise AirflowException(f"Unexpected value: {value}") def convert_array_types(self, value: list[Any] | SortedSet) -> list[Any]: - """Maps convert_value over array.""" + """Map convert_value over array.""" return [self.convert_value(nested_value) for nested_value in value] def convert_user_type(self, value: Any) -> dict[str, Any]: """ - Converts a user type to RECORD that contains n fields, where n is the number of attributes. + Convert a user type to RECORD that contains n fields, where n is the number of attributes. Each element in the user type class will be converted to its corresponding data type in BQ. """ diff --git a/airflow/providers/google/cloud/transfers/gcs_to_sftp.py b/airflow/providers/google/cloud/transfers/gcs_to_sftp.py index 150c801861eee..b23fe03c5e16b 100644 --- a/airflow/providers/google/cloud/transfers/gcs_to_sftp.py +++ b/airflow/providers/google/cloud/transfers/gcs_to_sftp.py @@ -176,7 +176,7 @@ def _copy_single_object( source_object: str, destination_path: str, ) -> None: - """Helper function to copy single object.""" + """Copy single object.""" self.log.info( "Executing copy of gs://%s/%s to %s", self.source_bucket, diff --git a/airflow/providers/google/cloud/transfers/mssql_to_gcs.py b/airflow/providers/google/cloud/transfers/mssql_to_gcs.py index 81e9af6d314ca..ab05518fc5e51 100644 --- a/airflow/providers/google/cloud/transfers/mssql_to_gcs.py +++ b/airflow/providers/google/cloud/transfers/mssql_to_gcs.py @@ -76,7 +76,7 @@ def __init__( def query(self): """ - Queries MSSQL and returns a cursor of results. + Query MSSQL and returns a cursor of results. :return: mssql cursor """ diff --git a/airflow/providers/google/cloud/transfers/mysql_to_gcs.py b/airflow/providers/google/cloud/transfers/mysql_to_gcs.py index cefde7c4ab074..877df731ca44c 100644 --- a/airflow/providers/google/cloud/transfers/mysql_to_gcs.py +++ b/airflow/providers/google/cloud/transfers/mysql_to_gcs.py @@ -67,7 +67,7 @@ def __init__(self, *, mysql_conn_id="mysql_default", ensure_utc=False, **kwargs) self.ensure_utc = ensure_utc def query(self): - """Queries mysql and returns a cursor to the results.""" + """Query mysql and returns a cursor to the results.""" mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id) conn = mysql.get_conn() cursor = conn.cursor() diff --git a/airflow/providers/google/cloud/transfers/oracle_to_gcs.py b/airflow/providers/google/cloud/transfers/oracle_to_gcs.py index fc96357d51725..d9ffce27f2a1f 100644 --- a/airflow/providers/google/cloud/transfers/oracle_to_gcs.py +++ b/airflow/providers/google/cloud/transfers/oracle_to_gcs.py @@ -62,7 +62,7 @@ def __init__(self, *, oracle_conn_id="oracle_default", ensure_utc=False, **kwarg self.oracle_conn_id = oracle_conn_id def query(self): - """Queries Oracle and returns a cursor to the results.""" + """Query Oracle and returns a cursor to the results.""" oracle = OracleHook(oracle_conn_id=self.oracle_conn_id) conn = oracle.get_conn() cursor = conn.cursor() diff --git a/airflow/providers/google/cloud/transfers/postgres_to_gcs.py b/airflow/providers/google/cloud/transfers/postgres_to_gcs.py index 9120f7c95afde..f97c3c7eeec12 100644 --- a/airflow/providers/google/cloud/transfers/postgres_to_gcs.py +++ b/airflow/providers/google/cloud/transfers/postgres_to_gcs.py @@ -115,7 +115,7 @@ def _unique_name(self): return f"{self.dag_id}__{self.task_id}__{uuid.uuid4()}" if self.use_server_side_cursor else None def query(self): - """Queries Postgres and returns a cursor to the results.""" + """Query Postgres and returns a cursor to the results.""" hook = PostgresHook(postgres_conn_id=self.postgres_conn_id) conn = hook.get_conn() cursor = conn.cursor(name=self._unique_name()) diff --git a/airflow/providers/google/cloud/transfers/presto_to_gcs.py b/airflow/providers/google/cloud/transfers/presto_to_gcs.py index 34e412802c2fe..c1c6a33417d3b 100644 --- a/airflow/providers/google/cloud/transfers/presto_to_gcs.py +++ b/airflow/providers/google/cloud/transfers/presto_to_gcs.py @@ -181,7 +181,7 @@ def __init__(self, *, presto_conn_id: str = "presto_default", **kwargs): self.presto_conn_id = presto_conn_id def query(self): - """Queries presto and returns a cursor to the results.""" + """Query presto and returns a cursor to the results.""" presto = PrestoHook(presto_conn_id=self.presto_conn_id) conn = presto.get_conn() cursor = conn.cursor() diff --git a/airflow/providers/google/cloud/transfers/s3_to_gcs.py b/airflow/providers/google/cloud/transfers/s3_to_gcs.py index 37d438f3dec63..30056a7e197ae 100644 --- a/airflow/providers/google/cloud/transfers/s3_to_gcs.py +++ b/airflow/providers/google/cloud/transfers/s3_to_gcs.py @@ -219,7 +219,7 @@ def exclude_existing_objects(self, s3_objects: list[str], gcs_hook: GCSHook) -> def s3_to_gcs_object(self, s3_object: str) -> str: """ - Transforms S3 path to GCS path according to the operator's logic. + Transform S3 path to GCS path according to the operator's logic. If apply_gcs_prefix == True then => If apply_gcs_prefix == False then => @@ -233,7 +233,7 @@ def s3_to_gcs_object(self, s3_object: str) -> str: def gcs_to_s3_object(self, gcs_object: str) -> str: """ - Transforms GCS path to S3 path according to the operator's logic. + Transform GCS path to S3 path according to the operator's logic. If apply_gcs_prefix == True then => If apply_gcs_prefix == False then => @@ -261,7 +261,7 @@ def transfer_files(self, s3_objects: list[str], gcs_hook: GCSHook, s3_hook: S3Ho self.log.info("All done, uploaded %d files to Google Cloud Storage", len(s3_objects)) def transfer_files_async(self, files: list[str], gcs_hook: GCSHook, s3_hook: S3Hook) -> None: - """Submits Google Cloud Storage Transfer Service job to copy files from AWS S3 to GCS.""" + """Submit Google Cloud Storage Transfer Service job to copy files from AWS S3 to GCS.""" if not len(files): raise ValueError("List of transferring files cannot be empty") job_names = self.submit_transfer_jobs(files=files, gcs_hook=gcs_hook, s3_hook=s3_hook) diff --git a/airflow/providers/google/cloud/transfers/sftp_to_gcs.py b/airflow/providers/google/cloud/transfers/sftp_to_gcs.py index d22cc8ae3e4d1..833edcda91bc4 100644 --- a/airflow/providers/google/cloud/transfers/sftp_to_gcs.py +++ b/airflow/providers/google/cloud/transfers/sftp_to_gcs.py @@ -147,7 +147,7 @@ def _copy_single_object( source_path: str, destination_object: str, ) -> None: - """Helper function to copy single object.""" + """Copy single object.""" self.log.info( "Executing copy of %s to gs://%s/%s", source_path, diff --git a/airflow/providers/google/cloud/transfers/sql_to_gcs.py b/airflow/providers/google/cloud/transfers/sql_to_gcs.py index 1529430c97a07..efb5693dcf582 100644 --- a/airflow/providers/google/cloud/transfers/sql_to_gcs.py +++ b/airflow/providers/google/cloud/transfers/sql_to_gcs.py @@ -228,7 +228,7 @@ def _write_rows_to_parquet(parquet_writer: pq.ParquetWriter, rows): def _write_local_data_files(self, cursor): """ - Takes a cursor, and writes results to a local file. + Take a cursor, and writes results to a local file. :return: A dictionary where keys are filenames to be used as object names in GCS, and values are file handles to local files that @@ -348,7 +348,7 @@ def _write_local_data_files(self, cursor): yield file_to_upload def _get_file_to_upload(self, file_mime_type, file_no): - """Returns a dictionary that represents the file to upload.""" + """Return a dictionary that represents the file to upload.""" tmp_file_handle = NamedTemporaryFile(mode="w", encoding="utf-8", delete=True) return ( { @@ -435,7 +435,7 @@ def _get_col_type_dict(self): def _write_local_schema_file(self, cursor): """ - Takes a cursor, and writes the BigQuery schema for the results to a local file system. + Take a cursor, and writes the BigQuery schema for the results to a local file system. Schema for database will be read from cursor if not specified. diff --git a/airflow/providers/google/cloud/transfers/trino_to_gcs.py b/airflow/providers/google/cloud/transfers/trino_to_gcs.py index d30827722e764..9fa91f1a332df 100644 --- a/airflow/providers/google/cloud/transfers/trino_to_gcs.py +++ b/airflow/providers/google/cloud/transfers/trino_to_gcs.py @@ -181,7 +181,7 @@ def __init__(self, *, trino_conn_id: str = "trino_default", **kwargs): self.trino_conn_id = trino_conn_id def query(self): - """Queries trino and returns a cursor to the results.""" + """Query trino and returns a cursor to the results.""" trino = TrinoHook(trino_conn_id=self.trino_conn_id) conn = trino.get_conn() cursor = conn.cursor() diff --git a/airflow/providers/google/cloud/triggers/bigquery.py b/airflow/providers/google/cloud/triggers/bigquery.py index a28eada370955..3d73e91e989e8 100644 --- a/airflow/providers/google/cloud/triggers/bigquery.py +++ b/airflow/providers/google/cloud/triggers/bigquery.py @@ -68,7 +68,7 @@ def __init__( self.impersonation_chain = impersonation_chain def serialize(self) -> tuple[str, dict[str, Any]]: - """Serializes BigQueryInsertJobTrigger arguments and classpath.""" + """Serialize BigQueryInsertJobTrigger arguments and classpath.""" return ( "airflow.providers.google.cloud.triggers.bigquery.BigQueryInsertJobTrigger", { @@ -83,7 +83,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]: ) async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override] - """Gets current job execution status and yields a TriggerEvent.""" + """Get current job execution status and yields a TriggerEvent.""" hook = self._get_async_hook() try: while True: @@ -119,7 +119,7 @@ class BigQueryCheckTrigger(BigQueryInsertJobTrigger): """BigQueryCheckTrigger run on the trigger worker.""" def serialize(self) -> tuple[str, dict[str, Any]]: - """Serializes BigQueryCheckTrigger arguments and classpath.""" + """Serialize BigQueryCheckTrigger arguments and classpath.""" return ( "airflow.providers.google.cloud.triggers.bigquery.BigQueryCheckTrigger", { @@ -134,7 +134,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]: ) async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override] - """Gets current job execution status and yields a TriggerEvent.""" + """Get current job execution status and yields a TriggerEvent.""" hook = self._get_async_hook() try: while True: @@ -193,7 +193,7 @@ def __init__(self, as_dict: bool = False, **kwargs): self.as_dict = as_dict def serialize(self) -> tuple[str, dict[str, Any]]: - """Serializes BigQueryInsertJobTrigger arguments and classpath.""" + """Serialize BigQueryInsertJobTrigger arguments and classpath.""" return ( "airflow.providers.google.cloud.triggers.bigquery.BigQueryGetDataTrigger", { @@ -209,7 +209,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]: ) async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override] - """Gets current job execution status and yields a TriggerEvent with response data.""" + """Get current job execution status and yields a TriggerEvent with response data.""" hook = self._get_async_hook() try: while True: @@ -307,7 +307,7 @@ def __init__( self.ignore_zero = ignore_zero def serialize(self) -> tuple[str, dict[str, Any]]: - """Serializes BigQueryCheckTrigger arguments and classpath.""" + """Serialize BigQueryCheckTrigger arguments and classpath.""" return ( "airflow.providers.google.cloud.triggers.bigquery.BigQueryIntervalCheckTrigger", { @@ -326,7 +326,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]: ) async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override] - """Gets current job execution status and yields a TriggerEvent.""" + """Get current job execution status and yields a TriggerEvent.""" hook = self._get_async_hook() try: while True: @@ -452,7 +452,7 @@ def __init__( self.tolerance = tolerance def serialize(self) -> tuple[str, dict[str, Any]]: - """Serializes BigQueryValueCheckTrigger arguments and classpath.""" + """Serialize BigQueryValueCheckTrigger arguments and classpath.""" return ( "airflow.providers.google.cloud.triggers.bigquery.BigQueryValueCheckTrigger", { @@ -470,7 +470,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]: ) async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override] - """Gets current job execution status and yields a TriggerEvent.""" + """Get current job execution status and yields a TriggerEvent.""" hook = self._get_async_hook() try: while True: @@ -536,7 +536,7 @@ def __init__( self.impersonation_chain = impersonation_chain def serialize(self) -> tuple[str, dict[str, Any]]: - """Serializes BigQueryTableExistenceTrigger arguments and classpath.""" + """Serialize BigQueryTableExistenceTrigger arguments and classpath.""" return ( "airflow.providers.google.cloud.triggers.bigquery.BigQueryTableExistenceTrigger", { @@ -623,7 +623,7 @@ def __init__(self, partition_id: str, **kwargs): self.partition_id = partition_id def serialize(self) -> tuple[str, dict[str, Any]]: - """Serializes BigQueryTablePartitionExistenceTrigger arguments and classpath.""" + """Serialize BigQueryTablePartitionExistenceTrigger arguments and classpath.""" return ( "airflow.providers.google.cloud.triggers.bigquery.BigQueryTablePartitionExistenceTrigger", { diff --git a/airflow/providers/google/cloud/triggers/bigquery_dts.py b/airflow/providers/google/cloud/triggers/bigquery_dts.py index 16d8ff7b34ff2..4d4d7fbdfa892 100644 --- a/airflow/providers/google/cloud/triggers/bigquery_dts.py +++ b/airflow/providers/google/cloud/triggers/bigquery_dts.py @@ -66,7 +66,7 @@ def __init__( self.impersonation_chain = impersonation_chain def serialize(self) -> tuple[str, dict[str, Any]]: - """Serializes class arguments and classpath.""" + """Serialize class arguments and classpath.""" return ( "airflow.providers.google.cloud.triggers.bigquery_dts.BigQueryDataTransferRunTrigger", { diff --git a/airflow/providers/google/cloud/triggers/cloud_batch.py b/airflow/providers/google/cloud/triggers/cloud_batch.py index 3ae6211fd3eb3..21b86688ab7ec 100644 --- a/airflow/providers/google/cloud/triggers/cloud_batch.py +++ b/airflow/providers/google/cloud/triggers/cloud_batch.py @@ -67,7 +67,7 @@ def __init__( self.impersonation_chain = impersonation_chain def serialize(self) -> tuple[str, dict[str, Any]]: - """Serializes class arguments and classpath.""" + """Serialize class arguments and classpath.""" return ( "airflow.providers.google.cloud.triggers.cloud_batch.CloudBatchJobFinishedTrigger", { @@ -83,6 +83,8 @@ def serialize(self) -> tuple[str, dict[str, Any]]: async def run(self) -> AsyncIterator[TriggerEvent]: """ + Fetch job status or yield certain Events. + Main loop of the class in where it is fetching the job status and yields certain Event. If the job has status success then it yields TriggerEvent with success status, if job has diff --git a/airflow/providers/google/cloud/triggers/cloud_build.py b/airflow/providers/google/cloud/triggers/cloud_build.py index dddb9d823acf3..20fd5d701ec80 100644 --- a/airflow/providers/google/cloud/triggers/cloud_build.py +++ b/airflow/providers/google/cloud/triggers/cloud_build.py @@ -62,7 +62,7 @@ def __init__( self.location = location def serialize(self) -> tuple[str, dict[str, Any]]: - """Serializes CloudBuildCreateBuildTrigger arguments and classpath.""" + """Serialize CloudBuildCreateBuildTrigger arguments and classpath.""" return ( "airflow.providers.google.cloud.triggers.cloud_build.CloudBuildCreateBuildTrigger", { @@ -76,7 +76,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]: ) async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override] - """Gets current build execution status and yields a TriggerEvent.""" + """Get current build execution status and yields a TriggerEvent.""" hook = self._get_async_hook() try: while True: diff --git a/airflow/providers/google/cloud/triggers/cloud_run.py b/airflow/providers/google/cloud/triggers/cloud_run.py index c0d3458c1246a..4e740aa048e40 100644 --- a/airflow/providers/google/cloud/triggers/cloud_run.py +++ b/airflow/providers/google/cloud/triggers/cloud_run.py @@ -81,7 +81,7 @@ def __init__( self.impersonation_chain = impersonation_chain def serialize(self) -> tuple[str, dict[str, Any]]: - """Serializes class arguments and classpath.""" + """Serialize class arguments and classpath.""" return ( "airflow.providers.google.cloud.triggers.cloud_run.CloudRunJobFinishedTrigger", { diff --git a/airflow/providers/google/cloud/triggers/cloud_storage_transfer_service.py b/airflow/providers/google/cloud/triggers/cloud_storage_transfer_service.py index 32cb855f1785f..cd0440cee2eb0 100644 --- a/airflow/providers/google/cloud/triggers/cloud_storage_transfer_service.py +++ b/airflow/providers/google/cloud/triggers/cloud_storage_transfer_service.py @@ -46,7 +46,7 @@ def __init__(self, job_names: list[str], project_id: str | None = None, poll_int self.poll_interval = poll_interval def serialize(self) -> tuple[str, dict[str, Any]]: - """Serializes StorageTransferJobsTrigger arguments and classpath.""" + """Serialize StorageTransferJobsTrigger arguments and classpath.""" return ( f"{self.__class__.__module__ }.{self.__class__.__qualname__}", { @@ -57,7 +57,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]: ) async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override] - """Gets current data storage transfer jobs and yields a TriggerEvent.""" + """Get current data storage transfer jobs and yields a TriggerEvent.""" async_hook: CloudDataTransferServiceAsyncHook = self.get_async_hook() while True: diff --git a/airflow/providers/google/cloud/triggers/dataflow.py b/airflow/providers/google/cloud/triggers/dataflow.py index 30f42dfdb19ca..c752b79978dde 100644 --- a/airflow/providers/google/cloud/triggers/dataflow.py +++ b/airflow/providers/google/cloud/triggers/dataflow.py @@ -69,7 +69,7 @@ def __init__( self.cancel_timeout = cancel_timeout def serialize(self) -> tuple[str, dict[str, Any]]: - """Serializes class arguments and classpath.""" + """Serialize class arguments and classpath.""" return ( "airflow.providers.google.cloud.triggers.dataflow.TemplateJobStartTrigger", { @@ -85,6 +85,8 @@ def serialize(self) -> tuple[str, dict[str, Any]]: async def run(self): """ + Fetch job status or yield certain Events. + Main loop of the class in where it is fetching the job status and yields certain Event. If the job has status success then it yields TriggerEvent with success status, if job has diff --git a/airflow/providers/google/cloud/triggers/datafusion.py b/airflow/providers/google/cloud/triggers/datafusion.py index d1419bfbc5f5d..563b03ccd2c2f 100644 --- a/airflow/providers/google/cloud/triggers/datafusion.py +++ b/airflow/providers/google/cloud/triggers/datafusion.py @@ -71,7 +71,7 @@ def __init__( self.success_states = success_states def serialize(self) -> tuple[str, dict[str, Any]]: - """Serializes DataFusionStartPipelineTrigger arguments and classpath.""" + """Serialize DataFusionStartPipelineTrigger arguments and classpath.""" return ( "airflow.providers.google.cloud.triggers.datafusion.DataFusionStartPipelineTrigger", { @@ -86,7 +86,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]: ) async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override] - """Gets current pipeline status and yields a TriggerEvent.""" + """Get current pipeline status and yields a TriggerEvent.""" hook = self._get_async_hook() try: while True: diff --git a/airflow/providers/google/cloud/triggers/dataplex.py b/airflow/providers/google/cloud/triggers/dataplex.py index ae03023926a2d..241981603ae8b 100644 --- a/airflow/providers/google/cloud/triggers/dataplex.py +++ b/airflow/providers/google/cloud/triggers/dataplex.py @@ -105,7 +105,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]: yield TriggerEvent({"job_id": self.job_id, "job_state": state, "job": self._convert_to_dict(job)}) def _convert_to_dict(self, job: DataScanJob) -> dict: - """Returns a representation of a DataScanJob instance as a dict.""" + """Return a representation of a DataScanJob instance as a dict.""" return DataScanJob.to_dict(job) @@ -187,5 +187,5 @@ async def run(self) -> AsyncIterator[TriggerEvent]: yield TriggerEvent({"job_id": self.job_id, "job_state": state, "job": self._convert_to_dict(job)}) def _convert_to_dict(self, job: DataScanJob) -> dict: - """Returns a representation of a DataScanJob instance as a dict.""" + """Return a representation of a DataScanJob instance as a dict.""" return DataScanJob.to_dict(job) diff --git a/airflow/providers/google/cloud/triggers/dataproc.py b/airflow/providers/google/cloud/triggers/dataproc.py index 7a612cdc29292..d9abe62a92333 100644 --- a/airflow/providers/google/cloud/triggers/dataproc.py +++ b/airflow/providers/google/cloud/triggers/dataproc.py @@ -182,7 +182,7 @@ def __init__(self, batch_id: str, **kwargs): self.batch_id = batch_id def serialize(self) -> tuple[str, dict[str, Any]]: - """Serializes DataprocBatchTrigger arguments and classpath.""" + """Serialize DataprocBatchTrigger arguments and classpath.""" return ( "airflow.providers.google.cloud.triggers.dataproc.DataprocBatchTrigger", { @@ -244,7 +244,7 @@ def __init__( self.metadata = metadata def serialize(self) -> tuple[str, dict[str, Any]]: - """Serializes DataprocDeleteClusterTrigger arguments and classpath.""" + """Serialize DataprocDeleteClusterTrigger arguments and classpath.""" return ( "airflow.providers.google.cloud.triggers.dataproc.DataprocDeleteClusterTrigger", { diff --git a/airflow/providers/google/cloud/triggers/gcs.py b/airflow/providers/google/cloud/triggers/gcs.py index f801e5ae9a78d..3ec1bbddfd733 100644 --- a/airflow/providers/google/cloud/triggers/gcs.py +++ b/airflow/providers/google/cloud/triggers/gcs.py @@ -60,7 +60,7 @@ def __init__( self.hook_params = hook_params def serialize(self) -> tuple[str, dict[str, Any]]: - """Serializes GCSBlobTrigger arguments and classpath.""" + """Serialize GCSBlobTrigger arguments and classpath.""" return ( "airflow.providers.google.cloud.triggers.gcs.GCSBlobTrigger", { @@ -93,7 +93,7 @@ def _get_async_hook(self) -> GCSAsyncHook: async def _object_exists(self, hook: GCSAsyncHook, bucket_name: str, object_name: str) -> str: """ - Checks for the existence of a file in Google Cloud Storage. + Check for the existence of a file in Google Cloud Storage. :param bucket_name: The Google Cloud Storage bucket where the object is. :param object_name: The name of the blob_name to check in the Google cloud @@ -143,7 +143,7 @@ def __init__( self.hook_params = hook_params def serialize(self) -> tuple[str, dict[str, Any]]: - """Serializes GCSCheckBlobUpdateTimeTrigger arguments and classpath.""" + """Serialize GCSCheckBlobUpdateTimeTrigger arguments and classpath.""" return ( "airflow.providers.google.cloud.triggers.gcs.GCSCheckBlobUpdateTimeTrigger", { @@ -181,7 +181,7 @@ async def _is_blob_updated_after( self, hook: GCSAsyncHook, bucket_name: str, object_name: str, target_date: datetime ) -> tuple[bool, dict[str, Any]]: """ - Checks if the object in the bucket is updated. + Check if the object in the bucket is updated. :param hook: GCSAsyncHook Hook class :param bucket_name: The Google Cloud Storage bucket where the object is. @@ -248,7 +248,7 @@ def __init__( self.prefix = prefix def serialize(self) -> tuple[str, dict[str, Any]]: - """Serializes GCSPrefixBlobTrigger arguments and classpath.""" + """Serialize GCSPrefixBlobTrigger arguments and classpath.""" return ( "airflow.providers.google.cloud.triggers.gcs.GCSPrefixBlobTrigger", { @@ -282,7 +282,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]: async def _list_blobs_with_prefix(self, hook: GCSAsyncHook, bucket_name: str, prefix: str) -> list[str]: """ - Returns names of blobs which match the given prefix for a given bucket. + Return names of blobs which match the given prefix for a given bucket. :param hook: The async hook to use for listing the blobs :param bucket_name: The Google Cloud Storage bucket where the object is. @@ -344,7 +344,7 @@ def __init__( self.last_activity_time: datetime | None = None def serialize(self) -> tuple[str, dict[str, Any]]: - """Serializes GCSUploadSessionTrigger arguments and classpath.""" + """Serialize GCSUploadSessionTrigger arguments and classpath.""" return ( "airflow.providers.google.cloud.triggers.gcs.GCSUploadSessionTrigger", { @@ -377,7 +377,11 @@ async def run(self) -> AsyncIterator[TriggerEvent]: yield TriggerEvent({"status": "error", "message": str(e)}) def _get_time(self) -> datetime: - """This is just a wrapper of datetime.datetime.now to simplify mocking in the unittests.""" + """ + Get current local date and time. + + This is just a wrapper of datetime.datetime.now to simplify mocking in the unittests. + """ return datetime.now() def _is_bucket_updated(self, current_objects: set[str]) -> dict[str, str]: diff --git a/airflow/providers/google/cloud/triggers/kubernetes_engine.py b/airflow/providers/google/cloud/triggers/kubernetes_engine.py index 0167ce5a6a743..de5984db47ee4 100644 --- a/airflow/providers/google/cloud/triggers/kubernetes_engine.py +++ b/airflow/providers/google/cloud/triggers/kubernetes_engine.py @@ -174,7 +174,7 @@ def __init__( self._hook: GKEAsyncHook | None = None def serialize(self) -> tuple[str, dict[str, Any]]: - """Serializes GKEOperationTrigger arguments and classpath.""" + """Serialize GKEOperationTrigger arguments and classpath.""" return ( "airflow.providers.google.cloud.triggers.kubernetes_engine.GKEOperationTrigger", { @@ -188,7 +188,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]: ) async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override] - """Gets operation status and yields corresponding event.""" + """Get operation status and yields corresponding event.""" hook = self._get_hook() try: while True: diff --git a/airflow/providers/google/cloud/triggers/mlengine.py b/airflow/providers/google/cloud/triggers/mlengine.py index d5c6cd60ca3dc..9e1e90df3c689 100644 --- a/airflow/providers/google/cloud/triggers/mlengine.py +++ b/airflow/providers/google/cloud/triggers/mlengine.py @@ -69,7 +69,7 @@ def __init__( self.impersonation_chain = impersonation_chain def serialize(self) -> tuple[str, dict[str, Any]]: - """Serializes MLEngineStartTrainingJobTrigger arguments and classpath.""" + """Serialize MLEngineStartTrainingJobTrigger arguments and classpath.""" return ( "airflow.providers.google.cloud.triggers.mlengine.MLEngineStartTrainingJobTrigger", { @@ -89,7 +89,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]: ) async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override] - """Gets current job execution status and yields a TriggerEvent.""" + """Get current job execution status and yields a TriggerEvent.""" hook = self._get_async_hook() try: while True: diff --git a/airflow/providers/google/cloud/triggers/pubsub.py b/airflow/providers/google/cloud/triggers/pubsub.py index b39d491fc6f74..51a530922bdec 100644 --- a/airflow/providers/google/cloud/triggers/pubsub.py +++ b/airflow/providers/google/cloud/triggers/pubsub.py @@ -79,7 +79,7 @@ def __init__( self.hook = PubSubAsyncHook() def serialize(self) -> tuple[str, dict[str, Any]]: - """Serializes PubsubPullTrigger arguments and classpath.""" + """Serialize PubsubPullTrigger arguments and classpath.""" return ( "airflow.providers.google.cloud.triggers.pubsub.PubsubPullTrigger", { diff --git a/airflow/providers/google/cloud/utils/bigquery.py b/airflow/providers/google/cloud/utils/bigquery.py index 86a88f96e7ff5..53b92732d84a7 100644 --- a/airflow/providers/google/cloud/utils/bigquery.py +++ b/airflow/providers/google/cloud/utils/bigquery.py @@ -21,7 +21,7 @@ def bq_cast(string_field: str, bq_type: str) -> None | int | float | bool | str: """ - Helper method that casts a BigQuery row to the appropriate data types. + Cast a BigQuery row to the appropriate data types. This is useful because BigQuery returns all fields as strings. """ @@ -41,7 +41,7 @@ def bq_cast(string_field: str, bq_type: str) -> None | int | float | bool | str: def convert_job_id(job_id: str | list[str], project_id: str, location: str | None) -> Any: """ - Helper method that converts to path: project_id:location:job_id. + Convert job_id to path: project_id:location:job_id. :param project_id: Required. The ID of the Google Cloud project where workspace located. :param location: Optional. The ID of the Google Cloud region where workspace located. diff --git a/airflow/providers/google/cloud/utils/credentials_provider.py b/airflow/providers/google/cloud/utils/credentials_provider.py index df83358e4b49e..06f2b5c4d83ff 100644 --- a/airflow/providers/google/cloud/utils/credentials_provider.py +++ b/airflow/providers/google/cloud/utils/credentials_provider.py @@ -358,7 +358,7 @@ def _log_debug(self, *args, **kwargs) -> None: def get_credentials_and_project_id(*args, **kwargs) -> tuple[google.auth.credentials.Credentials, str]: - """Returns the Credentials object for Google API and the associated project_id.""" + """Return the Credentials object for Google API and the associated project_id.""" return _CredentialProvider(*args, **kwargs).get_credentials_and_project() @@ -398,7 +398,7 @@ def _get_target_principal_and_delegates( def _get_project_id_from_service_account_email(service_account_email: str) -> str: """ - Extracts project_id from service account's email address. + Extract project_id from service account's email address. :param service_account_email: email of the service account. diff --git a/airflow/providers/google/cloud/utils/dataform.py b/airflow/providers/google/cloud/utils/dataform.py index 3395843cec420..636a605ab60e5 100644 --- a/airflow/providers/google/cloud/utils/dataform.py +++ b/airflow/providers/google/cloud/utils/dataform.py @@ -45,7 +45,7 @@ def make_initialization_workspace_flow( without_installation: bool = False, ) -> tuple: """ - Creates flow which simulates the initialization of the default project. + Create flow which simulates the initialization of the default project. :param project_id: Required. The ID of the Google Cloud project where workspace located. :param region: Required. The ID of the Google Cloud region where workspace located. diff --git a/airflow/providers/google/cloud/utils/field_validator.py b/airflow/providers/google/cloud/utils/field_validator.py index 1a3ccbff40cb5..2e5d6cc4e9d7f 100644 --- a/airflow/providers/google/cloud/utils/field_validator.py +++ b/airflow/providers/google/cloud/utils/field_validator.py @@ -309,7 +309,7 @@ def _validate_union( def _validate_field(self, validation_spec, dictionary_to_validate, parent=None, force_optional=False): """ - Validates if field is OK. + Validate if field is OK. :param validation_spec: specification of the field :param dictionary_to_validate: dictionary where the field should be present @@ -413,7 +413,7 @@ def _validate_field(self, validation_spec, dictionary_to_validate, parent=None, def validate(self, body_to_validate: dict) -> None: """ - Validates if the body (dictionary) follows specification that the validator was instantiated with. + Validate if the body (dictionary) follows specification that the validator was instantiated with. Raises ValidationSpecificationException or ValidationFieldException in case of problems with specification or the body not conforming to the specification respectively. diff --git a/airflow/providers/google/cloud/utils/helpers.py b/airflow/providers/google/cloud/utils/helpers.py index a0ff3e58ef4ff..4ade5474f3448 100644 --- a/airflow/providers/google/cloud/utils/helpers.py +++ b/airflow/providers/google/cloud/utils/helpers.py @@ -19,12 +19,12 @@ def normalize_directory_path(source_object: str | None) -> str | None: - """Makes sure dir path ends with a slash.""" + """Make sure dir path ends with a slash.""" return source_object + "/" if source_object and not source_object.endswith("/") else source_object def resource_path_to_dict(resource_name: str) -> dict[str, str]: - """Converts a path-like GCP resource name into a dictionary. + """Convert a path-like GCP resource name into a dictionary. For example, the path `projects/my-project/locations/my-location/instances/my-instance` will be converted to a dict: diff --git a/airflow/providers/google/cloud/utils/mlengine_operator_utils.py b/airflow/providers/google/cloud/utils/mlengine_operator_utils.py index cc85705653e06..683ddcaa28b92 100644 --- a/airflow/providers/google/cloud/utils/mlengine_operator_utils.py +++ b/airflow/providers/google/cloud/utils/mlengine_operator_utils.py @@ -57,7 +57,7 @@ def create_evaluate_ops( py_interpreter="python3", ) -> tuple[MLEngineStartBatchPredictionJobOperator, BeamRunPythonPipelineOperator, PythonOperator]: r""" - Creates Operators needed for model evaluation and returns. + Create Operators needed for model evaluation and returns. This function is deprecated. All the functionality of legacy MLEngine and new features are available on the Vertex AI platform. diff --git a/airflow/providers/google/cloud/utils/mlengine_prediction_summary.py b/airflow/providers/google/cloud/utils/mlengine_prediction_summary.py index 08ae13e4d1fea..1f1f3fff667eb 100644 --- a/airflow/providers/google/cloud/utils/mlengine_prediction_summary.py +++ b/airflow/providers/google/cloud/utils/mlengine_prediction_summary.py @@ -151,7 +151,7 @@ def MakeSummary(pcoll, metric_fn, metric_keys): def run(argv=None): - """Helper for obtaining prediction summary.""" + """Obtain prediction summary.""" parser = argparse.ArgumentParser() parser.add_argument( "--prediction_path", diff --git a/airflow/providers/google/common/auth_backend/google_openid.py b/airflow/providers/google/common/auth_backend/google_openid.py index eef4d5e31205b..5b387bc8c8fe4 100644 --- a/airflow/providers/google/common/auth_backend/google_openid.py +++ b/airflow/providers/google/common/auth_backend/google_openid.py @@ -52,7 +52,7 @@ def create_client_session(): def init_app(_): - """Initializes authentication.""" + """Initialize authentication.""" def _get_id_token_from_request(request) -> str | None: @@ -110,7 +110,7 @@ def _set_current_user(user): def requires_authentication(function: T): - """Decorator for functions that require authentication.""" + """Decorator for function that require authentication.""" @wraps(function) def decorated(*args, **kwargs): diff --git a/airflow/providers/google/common/hooks/base_google.py b/airflow/providers/google/common/hooks/base_google.py index 239f2175f3f93..a83524422a82c 100644 --- a/airflow/providers/google/common/hooks/base_google.py +++ b/airflow/providers/google/common/hooks/base_google.py @@ -79,6 +79,8 @@ def is_soft_quota_exception(exception: Exception): """ + Check for quota violation errors. + API for Google services does not have a standardized way to report quota violation errors. The function has been adapted by trial and error to the following services: @@ -100,6 +102,8 @@ def is_soft_quota_exception(exception: Exception): def is_operation_in_progress_exception(exception: Exception) -> bool: """ + Handle operation in-progress exceptions. + Some calls return 429 (too many requests!) or 409 errors (Conflict) in case of operation in progress. * Google Cloud SQL @@ -195,7 +199,7 @@ class GoogleBaseHook(BaseHook): @classmethod def get_connection_form_widgets(cls) -> dict[str, Any]: - """Returns connection widgets to add to connection form.""" + """Return connection widgets to add to connection form.""" from flask_appbuilder.fieldwidgets import BS3PasswordFieldWidget, BS3TextFieldWidget from flask_babel import lazy_gettext from wtforms import IntegerField, PasswordField, StringField @@ -228,7 +232,7 @@ def get_connection_form_widgets(cls) -> dict[str, Any]: @classmethod def get_ui_field_behaviour(cls) -> dict[str, Any]: - """Returns custom field behaviour.""" + """Return custom field behaviour.""" return { "hidden_fields": ["host", "schema", "login", "password", "port", "extra"], "relabeling": {}, @@ -249,7 +253,7 @@ def __init__( self._cached_project_id: str | None = None def get_credentials_and_project_id(self) -> tuple[google.auth.credentials.Credentials, str | None]: - """Returns the Credentials object for Google API and the associated project_id.""" + """Return the Credentials object for Google API and the associated project_id.""" if self._cached_credentials is not None: return self._cached_credentials, self._cached_project_id @@ -299,12 +303,12 @@ def get_credentials_and_project_id(self) -> tuple[google.auth.credentials.Creden return credentials, project_id def get_credentials(self) -> google.auth.credentials.Credentials: - """Returns the Credentials object for Google API.""" + """Return the Credentials object for Google API.""" credentials, _ = self.get_credentials_and_project_id() return credentials def _get_access_token(self) -> str: - """Returns a valid access token from Google API Credentials.""" + """Return a valid access token from Google API Credentials.""" credentials = self.get_credentials() auth_req = google.auth.transport.requests.Request() # credentials.token is None @@ -315,7 +319,7 @@ def _get_access_token(self) -> str: @functools.lru_cache(maxsize=None) def _get_credentials_email(self) -> str: """ - Returns the email address associated with the currently logged in account. + Return the email address associated with the currently logged in account. If a service account is used, it returns the service account. If user authentication (e.g. gcloud auth) is used, it returns the e-mail account of that user. @@ -341,7 +345,7 @@ def _get_credentials_email(self) -> str: return oauth2_client.tokeninfo().execute()["email"] def _authorize(self) -> google_auth_httplib2.AuthorizedHttp: - """Returns an authorized HTTP object to be used to build a Google cloud service hook connection.""" + """Return an authorized HTTP object to be used to build a Google cloud service hook connection.""" credentials = self.get_credentials() http = build_http() http = set_user_agent(http, "airflow/" + version.version) @@ -350,7 +354,7 @@ def _authorize(self) -> google_auth_httplib2.AuthorizedHttp: def _get_field(self, f: str, default: Any = None) -> Any: """ - Fetches a field from extras, and returns it. + Fetch a field from extras, and returns it. This is some Airflow magic. The google_cloud_platform hook type adds custom UI elements to the hook page, which allow admins to specify @@ -419,7 +423,7 @@ def scopes(self) -> Sequence[str]: @staticmethod def quota_retry(*args, **kwargs) -> Callable: - """Provides a mechanism to repeat requests in response to exceeding a temporary quota limit.""" + """Provide a mechanism to repeat requests in response to exceeding a temporary quota limit.""" def decorator(fun: Callable): default_kwargs = { @@ -435,7 +439,7 @@ def decorator(fun: Callable): @staticmethod def operation_in_progress_retry(*args, **kwargs) -> Callable[[T], T]: - """Provides a mechanism to repeat requests in response to operation in progress (HTTP 409) limit.""" + """Provide a mechanism to repeat requests in response to operation in progress (HTTP 409) limit.""" def decorator(fun: T): default_kwargs = { @@ -452,7 +456,7 @@ def decorator(fun: T): @staticmethod def fallback_to_default_project_id(func: Callable[..., RT]) -> Callable[..., RT]: """ - Decorator that provides fallback for Google Cloud project id. + Provide fallback for Google Cloud project id. To be used as a decorator. If the project is None it will be replaced with the project_id from the service account the Hook is authenticated with. Project id can be specified @@ -485,7 +489,7 @@ def inner_wrapper(self: GoogleBaseHook, *args, **kwargs) -> RT: @staticmethod def provide_gcp_credential_file(func: T) -> T: """ - Provides a Google Cloud credentials for Application Default Credentials (ADC) strategy support. + Provide a Google Cloud credentials for Application Default Credentials (ADC) strategy support. It is recommended to use ``provide_gcp_credential_file_as_context`` context manager to limit the scope when authorization data is available. Using context @@ -502,7 +506,7 @@ def wrapper(self: GoogleBaseHook, *args, **kwargs): @contextmanager def provide_gcp_credential_file_as_context(self) -> Generator[str | None, None, None]: """ - Provides a Google Cloud credentials for Application Default Credentials (ADC) strategy support. + Provide a Google Cloud credentials for Application Default Credentials (ADC) strategy support. See: `Application Default Credentials (ADC) @@ -538,7 +542,7 @@ def provide_gcp_credential_file_as_context(self) -> Generator[str | None, None, @contextmanager def provide_authorized_gcloud(self) -> Generator[None, None, None]: """ - Provides a separate gcloud configuration with current credentials. + Provide a separate gcloud configuration with current credentials. The gcloud tool allows you to login to Google Cloud only - ``gcloud auth login`` and for the needs of Application Default Credentials ``gcloud auth application-default login``. @@ -690,11 +694,15 @@ async def get_sync_hook(self) -> Any: return self._sync_hook async def get_token(self, *, session: ClientSession | None = None) -> _CredentialsToken: - """Returns a Token instance for use in [gcloud-aio](https://talkiq.github.io/gcloud-aio/) clients.""" + """Return a Token instance for use in [gcloud-aio](https://talkiq.github.io/gcloud-aio/) clients.""" sync_hook = await self.get_sync_hook() return await _CredentialsToken.from_hook(sync_hook, session=session) async def service_file_as_context(self) -> Any: - """This is the async equivalent of the non-async GoogleBaseHook's `provide_gcp_credential_file_as_context` method.""" + """ + Provide a Google Cloud credentials for Application Default Credentials (ADC) strategy support. + + This is the async equivalent of the non-async GoogleBaseHook's `provide_gcp_credential_file_as_context` method. + """ sync_hook = await self.get_sync_hook() return await sync_to_async(sync_hook.provide_gcp_credential_file_as_context)() diff --git a/airflow/providers/google/common/hooks/discovery_api.py b/airflow/providers/google/common/hooks/discovery_api.py index 37e2d865812e8..b1316fc0c933c 100644 --- a/airflow/providers/google/common/hooks/discovery_api.py +++ b/airflow/providers/google/common/hooks/discovery_api.py @@ -66,7 +66,7 @@ def __init__( def get_conn(self) -> Resource: """ - Creates an authenticated api client for the given api service name and credentials. + Create an authenticated api client for the given api service name and credentials. :return: the authenticated api service. """ @@ -84,7 +84,7 @@ def get_conn(self) -> Resource: def query(self, endpoint: str, data: dict, paginate: bool = False, num_retries: int = 0) -> dict: """ - Creates a dynamic API call to any Google API registered in Google's API Client Library and queries it. + Create a dynamic API call to any Google API registered in Google's API Client Library and queries it. :param endpoint: The client libraries path to the api call's executing method. For example: 'analyticsreporting.reports.batchGet' diff --git a/airflow/providers/google/common/utils/id_token_credentials.py b/airflow/providers/google/common/utils/id_token_credentials.py index c2b78bb2e1953..fc758569247c8 100644 --- a/airflow/providers/google/common/utils/id_token_credentials.py +++ b/airflow/providers/google/common/utils/id_token_credentials.py @@ -81,7 +81,7 @@ def _load_credentials_from_file( filename: str, target_audience: str | None ) -> google_auth_credentials.Credentials | None: """ - Loads credentials from a file. + Load credentials from a file. The credentials file must be a service account key or a stored authorized user credential. @@ -129,7 +129,7 @@ def _load_credentials_from_file( def _get_explicit_environ_credentials( target_audience: str | None, ) -> google_auth_credentials.Credentials | None: - """Gets credentials from the GOOGLE_APPLICATION_CREDENTIALS environment variable.""" + """Get credentials from the GOOGLE_APPLICATION_CREDENTIALS environment variable.""" explicit_file = os.environ.get(environment_vars.CREDENTIALS) if explicit_file is None: @@ -145,7 +145,7 @@ def _get_explicit_environ_credentials( def _get_gcloud_sdk_credentials( target_audience: str | None, ) -> google_auth_credentials.Credentials | None: - """Gets the credentials and project ID from the Cloud SDK.""" + """Get the credentials and project ID from the Cloud SDK.""" from google.auth import _cloud_sdk # type: ignore[attr-defined] # Check if application default credentials exist. @@ -162,7 +162,7 @@ def _get_gcloud_sdk_credentials( def _get_gce_credentials( target_audience: str | None, request: google.auth.transport.Request | None = None ) -> google_auth_credentials.Credentials | None: - """Gets credentials and project ID from the GCE Metadata Service.""" + """Get credentials and project ID from the GCE Metadata Service.""" # Ping requires a transport, but we want application default credentials # to require no arguments. So, we'll use the _http_client transport which # uses http.client. This is only acceptable because the metadata server @@ -191,7 +191,7 @@ def _get_gce_credentials( def get_default_id_token_credentials( target_audience: str | None, request: google.auth.transport.Request = None ) -> google_auth_credentials.Credentials: - """Gets the default ID Token credentials for the current environment. + """Get the default ID Token credentials for the current environment. `Application Default Credentials`_ provides an easy way to obtain credentials to call Google APIs for server-to-server or local applications. diff --git a/airflow/providers/google/firebase/hooks/firestore.py b/airflow/providers/google/firebase/hooks/firestore.py index 58c9811829734..660806844ba5e 100644 --- a/airflow/providers/google/firebase/hooks/firestore.py +++ b/airflow/providers/google/firebase/hooks/firestore.py @@ -65,7 +65,7 @@ def __init__( def get_conn(self): """ - Retrieves the connection to Cloud Firestore. + Retrieve the connection to Cloud Firestore. :return: Google Cloud Firestore services object. """ @@ -86,7 +86,7 @@ def export_documents( self, body: dict, database_id: str = "(default)", project_id: str | None = None ) -> None: """ - Starts a export with the specified configuration. + Start a export with the specified configuration. :param database_id: The Database ID. :param body: The request body. @@ -110,7 +110,7 @@ def export_documents( def _wait_for_operation_to_complete(self, operation_name: str) -> None: """ - Waits for the named operation to complete - checks status of the asynchronous call. + Wait for the named operation to complete - checks status of the asynchronous call. :param operation_name: The name of the operation. :return: The response returned by the operation. diff --git a/airflow/providers/google/leveldb/hooks/leveldb.py b/airflow/providers/google/leveldb/hooks/leveldb.py index 16d2773badd7d..dffc4665ba706 100644 --- a/airflow/providers/google/leveldb/hooks/leveldb.py +++ b/airflow/providers/google/leveldb/hooks/leveldb.py @@ -53,7 +53,7 @@ def __init__(self, leveldb_conn_id: str = default_conn_name): def get_conn(self, name: str = "/tmp/testdb/", create_if_missing: bool = False, **kwargs) -> DB: """ - Creates `Plyvel DB `__. + Create `Plyvel DB `__. :param name: path to create database e.g. `/tmp/testdb/`) :param create_if_missing: whether a new database should be created if needed @@ -66,7 +66,7 @@ def get_conn(self, name: str = "/tmp/testdb/", create_if_missing: bool = False, return self.db def close_conn(self) -> None: - """Closes connection.""" + """Close connection.""" db = self.db if db is not None: db.close() diff --git a/airflow/providers/google/marketing_platform/hooks/analytics.py b/airflow/providers/google/marketing_platform/hooks/analytics.py index 996d4f9c0e2e6..d73a30637c597 100644 --- a/airflow/providers/google/marketing_platform/hooks/analytics.py +++ b/airflow/providers/google/marketing_platform/hooks/analytics.py @@ -55,7 +55,7 @@ def _paginate(self, resource: Resource, list_args: dict[str, Any] | None = None) return result def get_conn(self) -> Resource: - """Retrieves connection to Google Analytics 360.""" + """Retrieve connection to Google Analytics 360.""" if not self._conn: http_authorized = self._authorize() self._conn = build( @@ -67,7 +67,7 @@ def get_conn(self) -> Resource: return self._conn def list_accounts(self) -> list[dict[str, Any]]: - """Lists accounts list from Google Analytics 360.""" + """List accounts list from Google Analytics 360.""" self.log.info("Retrieving accounts list...") conn = self.get_conn() accounts = conn.management().accounts() @@ -78,7 +78,7 @@ def get_ad_words_link( self, account_id: str, web_property_id: str, web_property_ad_words_link_id: str ) -> dict[str, Any]: """ - Returns a web property-Google Ads link to which the user has access. + Return a web property-Google Ads link to which the user has access. :param account_id: ID of the account which the given web property belongs to. :param web_property_id: Web property-Google Ads link UA-string. @@ -102,7 +102,7 @@ def get_ad_words_link( def list_ad_words_links(self, account_id: str, web_property_id: str) -> list[dict[str, Any]]: """ - Lists webProperty-Google Ads links for a given web property. + List webProperty-Google Ads links for a given web property. :param account_id: ID of the account which the given web property belongs to. :param web_property_id: Web property UA-string to retrieve the Google Ads links for. @@ -125,7 +125,7 @@ def upload_data( resumable_upload: bool = False, ) -> None: """ - Uploads file to GA via the Data Import API. + Upload file to GA via the Data Import API. :param file_location: The path and name of the file to upload. :param account_id: The GA account Id to which the data upload belongs. @@ -162,7 +162,7 @@ def delete_upload_data( delete_request_body: dict[str, Any], ) -> None: """ - Deletes the uploaded data for a given account/property/dataset. + Delete the uploaded data for a given account/property/dataset. :param account_id: The GA account Id to which the data upload belongs. :param web_property_id: UA-string associated with the upload. diff --git a/airflow/providers/google/marketing_platform/hooks/campaign_manager.py b/airflow/providers/google/marketing_platform/hooks/campaign_manager.py index 42e976db16078..72c34ff34af0b 100644 --- a/airflow/providers/google/marketing_platform/hooks/campaign_manager.py +++ b/airflow/providers/google/marketing_platform/hooks/campaign_manager.py @@ -49,7 +49,7 @@ def __init__( self.api_version = api_version def get_conn(self) -> Resource: - """Retrieves connection to Campaign Manager.""" + """Retrieve connection to Campaign Manager.""" if not self._conn: http_authorized = self._authorize() self._conn = build( @@ -62,7 +62,7 @@ def get_conn(self) -> Resource: def delete_report(self, profile_id: str, report_id: str) -> Any: """ - Deletes a report by its ID. + Delete a report by its ID. :param profile_id: The DFA user profile ID. :param report_id: The ID of the report. @@ -77,7 +77,7 @@ def delete_report(self, profile_id: str, report_id: str) -> Any: def insert_report(self, profile_id: str, report: dict[str, Any]) -> Any: """ - Creates a report. + Create a report. :param profile_id: The DFA user profile ID. :param report: The report resource to be inserted. @@ -99,7 +99,7 @@ def list_reports( sort_order: str | None = None, ) -> list[dict]: """ - Retrieves list of reports. + Retrieve list of reports. :param profile_id: The DFA user profile ID. :param max_results: Maximum number of results to return. @@ -125,7 +125,7 @@ def list_reports( def patch_report(self, profile_id: str, report_id: str, update_mask: dict) -> Any: """ - Updates a report. This method supports patch semantics. + Update a report. This method supports patch semantics. :param profile_id: The DFA user profile ID. :param report_id: The ID of the report. @@ -142,7 +142,7 @@ def patch_report(self, profile_id: str, report_id: str, update_mask: dict) -> An def run_report(self, profile_id: str, report_id: str, synchronous: bool | None = None) -> Any: """ - Runs a report. + Run a report. :param profile_id: The DFA profile ID. :param report_id: The ID of the report. @@ -158,7 +158,7 @@ def run_report(self, profile_id: str, report_id: str, synchronous: bool | None = def update_report(self, profile_id: str, report_id: str) -> Any: """ - Updates a report. + Update a report. :param profile_id: The DFA user profile ID. :param report_id: The ID of the report. @@ -173,7 +173,7 @@ def update_report(self, profile_id: str, report_id: str) -> Any: def get_report(self, file_id: str, profile_id: str, report_id: str) -> Any: """ - Retrieves a report file. + Retrieve a report file. :param profile_id: The DFA user profile ID. :param report_id: The ID of the report. @@ -190,7 +190,7 @@ def get_report(self, file_id: str, profile_id: str, report_id: str) -> Any: def get_report_file(self, file_id: str, profile_id: str, report_id: str) -> http.HttpRequest: """ - Retrieves a media part of report file. + Retrieve a media part of report file. :param profile_id: The DFA user profile ID. :param report_id: The ID of the report. @@ -234,7 +234,7 @@ def conversions_batch_insert( max_failed_inserts: int = 0, ) -> Any: """ - Inserts conversions. + Insert conversions. :param profile_id: User profile ID associated with this request. :param conversions: Conversations to insert, should by type of Conversation: @@ -278,7 +278,7 @@ def conversions_batch_update( max_failed_updates: int = 0, ) -> Any: """ - Updates existing conversions. + Update existing conversions. :param profile_id: User profile ID associated with this request. :param conversions: Conversations to update, should by type of Conversation: diff --git a/airflow/providers/google/marketing_platform/hooks/display_video.py b/airflow/providers/google/marketing_platform/hooks/display_video.py index d7927f709e547..373b3824eb6cb 100644 --- a/airflow/providers/google/marketing_platform/hooks/display_video.py +++ b/airflow/providers/google/marketing_platform/hooks/display_video.py @@ -45,7 +45,7 @@ def __init__( self.api_version = api_version def get_conn(self) -> Resource: - """Retrieves connection to DisplayVideo.""" + """Retrieve connection to DisplayVideo.""" if not self._conn: http_authorized = self._authorize() self._conn = build( @@ -57,7 +57,7 @@ def get_conn(self) -> Resource: return self._conn def get_conn_to_display_video(self) -> Resource: - """Retrieves connection to DisplayVideo.""" + """Retrieve connection to DisplayVideo.""" if not self._conn: http_authorized = self._authorize() self._conn = build( @@ -89,7 +89,7 @@ def erf_uri(partner_id, entity_type) -> list[str]: def create_query(self, query: dict[str, Any]) -> dict: """ - Creates a query. + Create a query. :param query: Query object to be passed to request body. """ @@ -98,7 +98,7 @@ def create_query(self, query: dict[str, Any]) -> dict: def delete_query(self, query_id: str) -> None: """ - Deletes a stored query as well as the associated stored reports. + Delete a stored query as well as the associated stored reports. :param query_id: Query ID to delete. """ @@ -106,7 +106,7 @@ def delete_query(self, query_id: str) -> None: def get_query(self, query_id: str) -> dict: """ - Retrieves a stored query. + Retrieve a stored query. :param query_id: Query ID to retrieve. """ @@ -114,13 +114,13 @@ def get_query(self, query_id: str) -> dict: return response def list_queries(self) -> list[dict]: - """Retrieves stored queries.""" + """Retrieve stored queries.""" response = self.get_conn().queries().list().execute(num_retries=self.num_retries) return response.get("queries", []) def run_query(self, query_id: str, params: dict[str, Any] | None) -> dict: """ - Runs a stored query to generate a report. + Run a stored query to generate a report. :param query_id: Query ID to run. :param params: Parameters for the report. @@ -131,7 +131,7 @@ def run_query(self, query_id: str, params: dict[str, Any] | None) -> dict: def get_report(self, query_id: str, report_id: str) -> dict: """ - Retrieves a report. + Retrieve a report. :param query_id: Query ID for which report was generated. :param report_id: Report ID to retrieve. @@ -146,7 +146,7 @@ def get_report(self, query_id: str, report_id: str) -> dict: def upload_line_items(self, line_items: Any) -> list[dict[str, Any]]: """ - Uploads line items in CSV format. + Upload line items in CSV format. :param line_items: downloaded data from GCS and passed to the body request :return: response body. @@ -167,7 +167,7 @@ def upload_line_items(self, line_items: Any) -> list[dict[str, Any]]: def download_line_items(self, request_body: dict[str, Any]) -> list[Any]: """ - Retrieves line items in CSV format. + Retrieve line items in CSV format. :param request_body: dictionary with parameters that should be passed into. More information about it can be found here: @@ -183,7 +183,7 @@ def download_line_items(self, request_body: dict[str, Any]) -> list[Any]: def create_sdf_download_operation(self, body_request: dict[str, Any]) -> dict[str, Any]: """ - Creates an SDF Download Task and Returns an Operation. + Create an SDF Download Task and Returns an Operation. :param body_request: Body request. @@ -200,7 +200,7 @@ def create_sdf_download_operation(self, body_request: dict[str, Any]) -> dict[st def get_sdf_download_operation(self, operation_name: str): """ - Gets the latest state of an asynchronous SDF download task operation. + Get the latest state of an asynchronous SDF download task operation. :param operation_name: The name of the operation resource. """ @@ -215,7 +215,7 @@ def get_sdf_download_operation(self, operation_name: str): def download_media(self, resource_name: str): """ - Downloads media. + Download media. :param resource_name: of the media that is being downloaded. """ diff --git a/airflow/providers/google/marketing_platform/hooks/search_ads.py b/airflow/providers/google/marketing_platform/hooks/search_ads.py index 859878c0f10fc..22ee48bea8642 100644 --- a/airflow/providers/google/marketing_platform/hooks/search_ads.py +++ b/airflow/providers/google/marketing_platform/hooks/search_ads.py @@ -45,7 +45,7 @@ def __init__( self.api_version = api_version def get_conn(self): - """Retrieves connection to Google SearchAds.""" + """Retrieve connection to Google SearchAds.""" if not self._conn: http_authorized = self._authorize() self._conn = build( @@ -58,7 +58,7 @@ def get_conn(self): def insert_report(self, report: dict[str, Any]) -> Any: """ - Inserts a report request into the reporting system. + Insert a report request into the reporting system. :param report: Report to be generated. """ @@ -67,7 +67,7 @@ def insert_report(self, report: dict[str, Any]) -> Any: def get(self, report_id: str) -> Any: """ - Polls for the status of a report request. + Poll for the status of a report request. :param report_id: ID of the report request being polled. """ @@ -76,7 +76,7 @@ def get(self, report_id: str) -> Any: def get_file(self, report_fragment: int, report_id: str) -> Any: """ - Downloads a report file encoded in UTF-8. + Download a report file encoded in UTF-8. :param report_fragment: The index of the report fragment to download. :param report_id: ID of the report. diff --git a/airflow/providers/google/suite/hooks/calendar.py b/airflow/providers/google/suite/hooks/calendar.py index 22b9f127612ca..3359300c50741 100644 --- a/airflow/providers/google/suite/hooks/calendar.py +++ b/airflow/providers/google/suite/hooks/calendar.py @@ -69,7 +69,7 @@ def __init__( def get_conn(self) -> Any: """ - Retrieves connection to Google Calendar. + Retrieve connection to Google Calendar. :return: Google Calendar services object. """ @@ -99,7 +99,7 @@ def get_events( updated_min: datetime | None = None, ) -> list: """ - Gets events from Google Calendar from a single calendar_id. + Get events from Google Calendar from a single calendar_id. https://developers.google.com/calendar/api/v3/reference/events/list diff --git a/airflow/providers/google/suite/hooks/drive.py b/airflow/providers/google/suite/hooks/drive.py index 89d91378801c1..1e4f58b5f724b 100644 --- a/airflow/providers/google/suite/hooks/drive.py +++ b/airflow/providers/google/suite/hooks/drive.py @@ -64,7 +64,7 @@ def __init__( def get_conn(self) -> Any: """ - Retrieves the connection to Google Drive. + Retrieve the connection to Google Drive. :return: Google Drive services object. """ @@ -132,7 +132,7 @@ def _ensure_folders_exists(self, path: str, folder_id: str) -> str: def get_media_request(self, file_id: str) -> HttpRequest: """ - Returns a get_media http request to a Google Drive object. + Return a get_media http request to a Google Drive object. :param file_id: The Google Drive file id :return: request @@ -145,7 +145,7 @@ def exists( self, folder_id: str, file_name: str, drive_id: str | None = None, *, include_trashed: bool = True ) -> bool: """ - Checks to see if a file exists within a Google Drive folder. + Check to see if a file exists within a Google Drive folder. :param folder_id: The id of the Google Drive folder in which the file resides :param file_name: The name of a file in Google Drive @@ -162,7 +162,7 @@ def exists( def _get_file_info(self, file_id: str): """ - Returns Google API file_info object containing id, name, parents in the response. + Return Google API file_info object containing id, name, parents in the response. https://developers.google.com/drive/api/v3/reference/files/get @@ -183,7 +183,7 @@ def _get_file_info(self, file_id: str): def _resolve_file_path(self, file_id: str) -> str: """ - Returns the full Google Drive path for given file_id. + Return the full Google Drive path for given file_id. :param file_id: The id of a file in Google Drive :return: Google Drive full path for a file @@ -211,7 +211,7 @@ def get_file_id( self, folder_id: str, file_name: str, drive_id: str | None = None, *, include_trashed: bool = True ) -> dict: """ - Returns the file id of a Google Drive file. + Return the file id of a Google Drive file. :param folder_id: The id of the Google Drive folder in which the file resides :param file_name: The name of a file in Google Drive @@ -264,7 +264,7 @@ def upload_file( show_full_target_path: bool = True, ) -> str: """ - Uploads a file that is available locally to a Google Drive service. + Upload a file that is available locally to a Google Drive service. :param local_location: The path where the file is available. :param remote_location: The path where the file will be send diff --git a/airflow/providers/google/suite/hooks/sheets.py b/airflow/providers/google/suite/hooks/sheets.py index 7a4ae4479ec64..050b9527345e2 100644 --- a/airflow/providers/google/suite/hooks/sheets.py +++ b/airflow/providers/google/suite/hooks/sheets.py @@ -66,7 +66,7 @@ def __init__( def get_conn(self) -> Any: """ - Retrieves connection to Google Sheets. + Retrieve connection to Google Sheets. :return: Google Sheets services object. """ @@ -85,7 +85,7 @@ def get_values( date_time_render_option: str = "SERIAL_NUMBER", ) -> list: """ - Gets values from Google Sheet from a single range. + Get values from Google Sheet from a single range. https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets.values/get @@ -125,7 +125,7 @@ def batch_get_values( date_time_render_option: str = "SERIAL_NUMBER", ) -> dict: """ - Gets values from Google Sheet from a list of ranges. + Get values from Google Sheet from a list of ranges. https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets.values/batchGet @@ -168,7 +168,7 @@ def update_values( date_time_render_option: str = "SERIAL_NUMBER", ) -> dict: """ - Updates values from Google Sheet from a single range. + Update values from Google Sheet from a single range. https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets.values/update @@ -219,7 +219,7 @@ def batch_update_values( date_time_render_option: str = "SERIAL_NUMBER", ) -> dict: """ - Updates values from Google Sheet for multiple ranges. + Update values from Google Sheet for multiple ranges. https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets.values/batchUpdate @@ -365,7 +365,7 @@ def batch_clear(self, spreadsheet_id: str, ranges: list) -> dict: def get_spreadsheet(self, spreadsheet_id: str): """ - Retrieves spreadsheet matching the given id. + Retrieve spreadsheet matching the given id. :param spreadsheet_id: The spreadsheet id. :return: An spreadsheet that matches the sheet filter. @@ -380,7 +380,7 @@ def get_spreadsheet(self, spreadsheet_id: str): def get_sheet_titles(self, spreadsheet_id: str, sheet_filter: list[str] | None = None): """ - Retrieves the sheet titles from a spreadsheet matching the given id and sheet filter. + Retrieve the sheet titles from a spreadsheet matching the given id and sheet filter. :param spreadsheet_id: The spreadsheet id. :param sheet_filter: List of sheet title to retrieve from sheet. @@ -401,7 +401,7 @@ def get_sheet_titles(self, spreadsheet_id: str, sheet_filter: list[str] | None = def create_spreadsheet(self, spreadsheet: dict[str, Any]) -> dict[str, Any]: """ - Creates a spreadsheet, returning the newly created spreadsheet. + Create a spreadsheet, returning the newly created spreadsheet. :param spreadsheet: an instance of Spreadsheet https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets#Spreadsheet diff --git a/pyproject.toml b/pyproject.toml index 4ab33e9c1e40e..f8e949739e42b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1445,51 +1445,10 @@ combine-as-imports = true "airflow/providers/google/cloud/transfers/cassandra_to_gcs.py" = ["D401"] "airflow/providers/google/cloud/transfers/gcs_to_bigquery.py" = ["D401"] "airflow/providers/google/cloud/transfers/gcs_to_gcs.py" = ["D401"] -"airflow/providers/google/cloud/transfers/gcs_to_sftp.py" = ["D401"] "airflow/providers/google/cloud/transfers/local_to_gcs.py" = ["D401"] -"airflow/providers/google/cloud/transfers/mssql_to_gcs.py" = ["D401"] -"airflow/providers/google/cloud/transfers/mysql_to_gcs.py" = ["D401"] -"airflow/providers/google/cloud/transfers/oracle_to_gcs.py" = ["D401"] -"airflow/providers/google/cloud/transfers/postgres_to_gcs.py" = ["D401"] -"airflow/providers/google/cloud/transfers/presto_to_gcs.py" = ["D401"] "airflow/providers/google/cloud/transfers/s3_to_gcs.py" = ["D401"] -"airflow/providers/google/cloud/transfers/sftp_to_gcs.py" = ["D401"] -"airflow/providers/google/cloud/transfers/sql_to_gcs.py" = ["D401"] "airflow/providers/google/cloud/transfers/trino_to_gcs.py" = ["D401"] -"airflow/providers/google/cloud/triggers/bigquery.py" = ["D401"] -"airflow/providers/google/cloud/triggers/bigquery_dts.py" = ["D401"] -"airflow/providers/google/cloud/triggers/cloud_batch.py" = ["D401"] -"airflow/providers/google/cloud/triggers/cloud_build.py" = ["D401"] -"airflow/providers/google/cloud/triggers/cloud_run.py" = ["D401"] -"airflow/providers/google/cloud/triggers/cloud_storage_transfer_service.py" = ["D401"] -"airflow/providers/google/cloud/triggers/dataflow.py" = ["D401"] -"airflow/providers/google/cloud/triggers/datafusion.py" = ["D401"] -"airflow/providers/google/cloud/triggers/dataplex.py" = ["D401"] -"airflow/providers/google/cloud/triggers/dataproc.py" = ["D401"] -"airflow/providers/google/cloud/triggers/gcs.py" = ["D401"] -"airflow/providers/google/cloud/triggers/kubernetes_engine.py" = ["D401"] -"airflow/providers/google/cloud/triggers/mlengine.py" = ["D401"] -"airflow/providers/google/cloud/triggers/pubsub.py" = ["D401"] -"airflow/providers/google/cloud/utils/bigquery.py" = ["D401"] -"airflow/providers/google/cloud/utils/credentials_provider.py" = ["D401"] -"airflow/providers/google/cloud/utils/dataform.py" = ["D401"] -"airflow/providers/google/cloud/utils/field_validator.py" = ["D401"] -"airflow/providers/google/cloud/utils/helpers.py" = ["D401"] -"airflow/providers/google/cloud/utils/mlengine_operator_utils.py" = ["D401"] -"airflow/providers/google/cloud/utils/mlengine_prediction_summary.py" = ["D401"] "airflow/providers/google/common/auth_backend/google_openid.py" = ["D401"] -"airflow/providers/google/common/hooks/base_google.py" = ["D401"] -"airflow/providers/google/common/hooks/discovery_api.py" = ["D401"] -"airflow/providers/google/common/utils/id_token_credentials.py" = ["D401"] -"airflow/providers/google/firebase/hooks/firestore.py" = ["D401"] -"airflow/providers/google/leveldb/hooks/leveldb.py" = ["D401"] -"airflow/providers/google/marketing_platform/hooks/analytics.py" = ["D401"] -"airflow/providers/google/marketing_platform/hooks/campaign_manager.py" = ["D401"] -"airflow/providers/google/marketing_platform/hooks/display_video.py" = ["D401"] -"airflow/providers/google/marketing_platform/hooks/search_ads.py" = ["D401"] -"airflow/providers/google/suite/hooks/calendar.py" = ["D401"] -"airflow/providers/google/suite/hooks/drive.py" = ["D401"] -"airflow/providers/google/suite/hooks/sheets.py" = ["D401"] "airflow/providers/hashicorp/_internal_client/vault_client.py" = ["D401"] "airflow/providers/hashicorp/hooks/vault.py" = ["D401"] "airflow/providers/imap/hooks/imap.py" = ["D401"]