Skip to content

Commit

Permalink
D401 lint fixes for google provider (#37304)
Browse files Browse the repository at this point in the history

---------

Signed-off-by: kalyanr <kalyan.ben10@live.com>
  • Loading branch information
rawwar committed Feb 10, 2024
1 parent bb414f0 commit 0a8e771
Show file tree
Hide file tree
Showing 46 changed files with 165 additions and 190 deletions.
8 changes: 4 additions & 4 deletions airflow/providers/google/cloud/transfers/cassandra_to_gcs.py
Expand Up @@ -218,7 +218,7 @@ def _write_local_data_files(self, cursor):

def _write_local_schema_file(self, cursor):
"""
Takes a cursor, and writes the BigQuery schema for the results to a local file system.
Take a cursor, and writes the BigQuery schema for the results to a local file system.
:return: A dictionary where key is a filename to be used as an object
name in GCS, and values are file handles to local files that
Expand Down Expand Up @@ -253,7 +253,7 @@ def _upload_to_gcs(self, file_to_upload):
)

def generate_data_dict(self, names: Iterable[str], values: Any) -> dict[str, Any]:
"""Generates data structure that will be stored as file in GCS."""
"""Generate data structure that will be stored as file in GCS."""
return {n: self.convert_value(v) for n, v in zip(names, values)}

def convert_value(self, value: Any | None) -> Any | None:
Expand Down Expand Up @@ -285,12 +285,12 @@ def convert_value(self, value: Any | None) -> Any | None:
raise AirflowException(f"Unexpected value: {value}")

def convert_array_types(self, value: list[Any] | SortedSet) -> list[Any]:
"""Maps convert_value over array."""
"""Map convert_value over array."""
return [self.convert_value(nested_value) for nested_value in value]

def convert_user_type(self, value: Any) -> dict[str, Any]:
"""
Converts a user type to RECORD that contains n fields, where n is the number of attributes.
Convert a user type to RECORD that contains n fields, where n is the number of attributes.
Each element in the user type class will be converted to its corresponding data type in BQ.
"""
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/transfers/gcs_to_sftp.py
Expand Up @@ -176,7 +176,7 @@ def _copy_single_object(
source_object: str,
destination_path: str,
) -> None:
"""Helper function to copy single object."""
"""Copy single object."""
self.log.info(
"Executing copy of gs://%s/%s to %s",
self.source_bucket,
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/transfers/mssql_to_gcs.py
Expand Up @@ -76,7 +76,7 @@ def __init__(

def query(self):
"""
Queries MSSQL and returns a cursor of results.
Query MSSQL and returns a cursor of results.
:return: mssql cursor
"""
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/transfers/mysql_to_gcs.py
Expand Up @@ -67,7 +67,7 @@ def __init__(self, *, mysql_conn_id="mysql_default", ensure_utc=False, **kwargs)
self.ensure_utc = ensure_utc

def query(self):
"""Queries mysql and returns a cursor to the results."""
"""Query mysql and returns a cursor to the results."""
mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id)
conn = mysql.get_conn()
cursor = conn.cursor()
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/transfers/oracle_to_gcs.py
Expand Up @@ -62,7 +62,7 @@ def __init__(self, *, oracle_conn_id="oracle_default", ensure_utc=False, **kwarg
self.oracle_conn_id = oracle_conn_id

def query(self):
"""Queries Oracle and returns a cursor to the results."""
"""Query Oracle and returns a cursor to the results."""
oracle = OracleHook(oracle_conn_id=self.oracle_conn_id)
conn = oracle.get_conn()
cursor = conn.cursor()
Expand Down
Expand Up @@ -115,7 +115,7 @@ def _unique_name(self):
return f"{self.dag_id}__{self.task_id}__{uuid.uuid4()}" if self.use_server_side_cursor else None

def query(self):
"""Queries Postgres and returns a cursor to the results."""
"""Query Postgres and returns a cursor to the results."""
hook = PostgresHook(postgres_conn_id=self.postgres_conn_id)
conn = hook.get_conn()
cursor = conn.cursor(name=self._unique_name())
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/transfers/presto_to_gcs.py
Expand Up @@ -181,7 +181,7 @@ def __init__(self, *, presto_conn_id: str = "presto_default", **kwargs):
self.presto_conn_id = presto_conn_id

def query(self):
"""Queries presto and returns a cursor to the results."""
"""Query presto and returns a cursor to the results."""
presto = PrestoHook(presto_conn_id=self.presto_conn_id)
conn = presto.get_conn()
cursor = conn.cursor()
Expand Down
6 changes: 3 additions & 3 deletions airflow/providers/google/cloud/transfers/s3_to_gcs.py
Expand Up @@ -219,7 +219,7 @@ def exclude_existing_objects(self, s3_objects: list[str], gcs_hook: GCSHook) ->

def s3_to_gcs_object(self, s3_object: str) -> str:
"""
Transforms S3 path to GCS path according to the operator's logic.
Transform S3 path to GCS path according to the operator's logic.
If apply_gcs_prefix == True then <s3_prefix><content> => <gcs_prefix><content>
If apply_gcs_prefix == False then <s3_prefix><content> => <gcs_prefix><s3_prefix><content>
Expand All @@ -233,7 +233,7 @@ def s3_to_gcs_object(self, s3_object: str) -> str:

def gcs_to_s3_object(self, gcs_object: str) -> str:
"""
Transforms GCS path to S3 path according to the operator's logic.
Transform GCS path to S3 path according to the operator's logic.
If apply_gcs_prefix == True then <gcs_prefix><content> => <s3_prefix><content>
If apply_gcs_prefix == False then <gcs_prefix><s3_prefix><content> => <s3_prefix><content>
Expand Down Expand Up @@ -261,7 +261,7 @@ def transfer_files(self, s3_objects: list[str], gcs_hook: GCSHook, s3_hook: S3Ho
self.log.info("All done, uploaded %d files to Google Cloud Storage", len(s3_objects))

def transfer_files_async(self, files: list[str], gcs_hook: GCSHook, s3_hook: S3Hook) -> None:
"""Submits Google Cloud Storage Transfer Service job to copy files from AWS S3 to GCS."""
"""Submit Google Cloud Storage Transfer Service job to copy files from AWS S3 to GCS."""
if not len(files):
raise ValueError("List of transferring files cannot be empty")
job_names = self.submit_transfer_jobs(files=files, gcs_hook=gcs_hook, s3_hook=s3_hook)
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/transfers/sftp_to_gcs.py
Expand Up @@ -147,7 +147,7 @@ def _copy_single_object(
source_path: str,
destination_object: str,
) -> None:
"""Helper function to copy single object."""
"""Copy single object."""
self.log.info(
"Executing copy of %s to gs://%s/%s",
source_path,
Expand Down
6 changes: 3 additions & 3 deletions airflow/providers/google/cloud/transfers/sql_to_gcs.py
Expand Up @@ -228,7 +228,7 @@ def _write_rows_to_parquet(parquet_writer: pq.ParquetWriter, rows):

def _write_local_data_files(self, cursor):
"""
Takes a cursor, and writes results to a local file.
Take a cursor, and writes results to a local file.
:return: A dictionary where keys are filenames to be used as object
names in GCS, and values are file handles to local files that
Expand Down Expand Up @@ -348,7 +348,7 @@ def _write_local_data_files(self, cursor):
yield file_to_upload

def _get_file_to_upload(self, file_mime_type, file_no):
"""Returns a dictionary that represents the file to upload."""
"""Return a dictionary that represents the file to upload."""
tmp_file_handle = NamedTemporaryFile(mode="w", encoding="utf-8", delete=True)
return (
{
Expand Down Expand Up @@ -435,7 +435,7 @@ def _get_col_type_dict(self):

def _write_local_schema_file(self, cursor):
"""
Takes a cursor, and writes the BigQuery schema for the results to a local file system.
Take a cursor, and writes the BigQuery schema for the results to a local file system.
Schema for database will be read from cursor if not specified.
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/transfers/trino_to_gcs.py
Expand Up @@ -181,7 +181,7 @@ def __init__(self, *, trino_conn_id: str = "trino_default", **kwargs):
self.trino_conn_id = trino_conn_id

def query(self):
"""Queries trino and returns a cursor to the results."""
"""Query trino and returns a cursor to the results."""
trino = TrinoHook(trino_conn_id=self.trino_conn_id)
conn = trino.get_conn()
cursor = conn.cursor()
Expand Down
24 changes: 12 additions & 12 deletions airflow/providers/google/cloud/triggers/bigquery.py
Expand Up @@ -68,7 +68,7 @@ def __init__(
self.impersonation_chain = impersonation_chain

def serialize(self) -> tuple[str, dict[str, Any]]:
"""Serializes BigQueryInsertJobTrigger arguments and classpath."""
"""Serialize BigQueryInsertJobTrigger arguments and classpath."""
return (
"airflow.providers.google.cloud.triggers.bigquery.BigQueryInsertJobTrigger",
{
Expand All @@ -83,7 +83,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
)

async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
"""Gets current job execution status and yields a TriggerEvent."""
"""Get current job execution status and yields a TriggerEvent."""
hook = self._get_async_hook()
try:
while True:
Expand Down Expand Up @@ -119,7 +119,7 @@ class BigQueryCheckTrigger(BigQueryInsertJobTrigger):
"""BigQueryCheckTrigger run on the trigger worker."""

def serialize(self) -> tuple[str, dict[str, Any]]:
"""Serializes BigQueryCheckTrigger arguments and classpath."""
"""Serialize BigQueryCheckTrigger arguments and classpath."""
return (
"airflow.providers.google.cloud.triggers.bigquery.BigQueryCheckTrigger",
{
Expand All @@ -134,7 +134,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
)

async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
"""Gets current job execution status and yields a TriggerEvent."""
"""Get current job execution status and yields a TriggerEvent."""
hook = self._get_async_hook()
try:
while True:
Expand Down Expand Up @@ -193,7 +193,7 @@ def __init__(self, as_dict: bool = False, **kwargs):
self.as_dict = as_dict

def serialize(self) -> tuple[str, dict[str, Any]]:
"""Serializes BigQueryInsertJobTrigger arguments and classpath."""
"""Serialize BigQueryInsertJobTrigger arguments and classpath."""
return (
"airflow.providers.google.cloud.triggers.bigquery.BigQueryGetDataTrigger",
{
Expand All @@ -209,7 +209,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
)

async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
"""Gets current job execution status and yields a TriggerEvent with response data."""
"""Get current job execution status and yields a TriggerEvent with response data."""
hook = self._get_async_hook()
try:
while True:
Expand Down Expand Up @@ -307,7 +307,7 @@ def __init__(
self.ignore_zero = ignore_zero

def serialize(self) -> tuple[str, dict[str, Any]]:
"""Serializes BigQueryCheckTrigger arguments and classpath."""
"""Serialize BigQueryCheckTrigger arguments and classpath."""
return (
"airflow.providers.google.cloud.triggers.bigquery.BigQueryIntervalCheckTrigger",
{
Expand All @@ -326,7 +326,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
)

async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
"""Gets current job execution status and yields a TriggerEvent."""
"""Get current job execution status and yields a TriggerEvent."""
hook = self._get_async_hook()
try:
while True:
Expand Down Expand Up @@ -452,7 +452,7 @@ def __init__(
self.tolerance = tolerance

def serialize(self) -> tuple[str, dict[str, Any]]:
"""Serializes BigQueryValueCheckTrigger arguments and classpath."""
"""Serialize BigQueryValueCheckTrigger arguments and classpath."""
return (
"airflow.providers.google.cloud.triggers.bigquery.BigQueryValueCheckTrigger",
{
Expand All @@ -470,7 +470,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
)

async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
"""Gets current job execution status and yields a TriggerEvent."""
"""Get current job execution status and yields a TriggerEvent."""
hook = self._get_async_hook()
try:
while True:
Expand Down Expand Up @@ -536,7 +536,7 @@ def __init__(
self.impersonation_chain = impersonation_chain

def serialize(self) -> tuple[str, dict[str, Any]]:
"""Serializes BigQueryTableExistenceTrigger arguments and classpath."""
"""Serialize BigQueryTableExistenceTrigger arguments and classpath."""
return (
"airflow.providers.google.cloud.triggers.bigquery.BigQueryTableExistenceTrigger",
{
Expand Down Expand Up @@ -623,7 +623,7 @@ def __init__(self, partition_id: str, **kwargs):
self.partition_id = partition_id

def serialize(self) -> tuple[str, dict[str, Any]]:
"""Serializes BigQueryTablePartitionExistenceTrigger arguments and classpath."""
"""Serialize BigQueryTablePartitionExistenceTrigger arguments and classpath."""
return (
"airflow.providers.google.cloud.triggers.bigquery.BigQueryTablePartitionExistenceTrigger",
{
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/triggers/bigquery_dts.py
Expand Up @@ -66,7 +66,7 @@ def __init__(
self.impersonation_chain = impersonation_chain

def serialize(self) -> tuple[str, dict[str, Any]]:
"""Serializes class arguments and classpath."""
"""Serialize class arguments and classpath."""
return (
"airflow.providers.google.cloud.triggers.bigquery_dts.BigQueryDataTransferRunTrigger",
{
Expand Down
4 changes: 3 additions & 1 deletion airflow/providers/google/cloud/triggers/cloud_batch.py
Expand Up @@ -67,7 +67,7 @@ def __init__(
self.impersonation_chain = impersonation_chain

def serialize(self) -> tuple[str, dict[str, Any]]:
"""Serializes class arguments and classpath."""
"""Serialize class arguments and classpath."""
return (
"airflow.providers.google.cloud.triggers.cloud_batch.CloudBatchJobFinishedTrigger",
{
Expand All @@ -83,6 +83,8 @@ def serialize(self) -> tuple[str, dict[str, Any]]:

async def run(self) -> AsyncIterator[TriggerEvent]:
"""
Fetch job status or yield certain Events.
Main loop of the class in where it is fetching the job status and yields certain Event.
If the job has status success then it yields TriggerEvent with success status, if job has
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/google/cloud/triggers/cloud_build.py
Expand Up @@ -62,7 +62,7 @@ def __init__(
self.location = location

def serialize(self) -> tuple[str, dict[str, Any]]:
"""Serializes CloudBuildCreateBuildTrigger arguments and classpath."""
"""Serialize CloudBuildCreateBuildTrigger arguments and classpath."""
return (
"airflow.providers.google.cloud.triggers.cloud_build.CloudBuildCreateBuildTrigger",
{
Expand All @@ -76,7 +76,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
)

async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
"""Gets current build execution status and yields a TriggerEvent."""
"""Get current build execution status and yields a TriggerEvent."""
hook = self._get_async_hook()
try:
while True:
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/triggers/cloud_run.py
Expand Up @@ -81,7 +81,7 @@ def __init__(
self.impersonation_chain = impersonation_chain

def serialize(self) -> tuple[str, dict[str, Any]]:
"""Serializes class arguments and classpath."""
"""Serialize class arguments and classpath."""
return (
"airflow.providers.google.cloud.triggers.cloud_run.CloudRunJobFinishedTrigger",
{
Expand Down
Expand Up @@ -46,7 +46,7 @@ def __init__(self, job_names: list[str], project_id: str | None = None, poll_int
self.poll_interval = poll_interval

def serialize(self) -> tuple[str, dict[str, Any]]:
"""Serializes StorageTransferJobsTrigger arguments and classpath."""
"""Serialize StorageTransferJobsTrigger arguments and classpath."""
return (
f"{self.__class__.__module__ }.{self.__class__.__qualname__}",
{
Expand All @@ -57,7 +57,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
)

async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
"""Gets current data storage transfer jobs and yields a TriggerEvent."""
"""Get current data storage transfer jobs and yields a TriggerEvent."""
async_hook: CloudDataTransferServiceAsyncHook = self.get_async_hook()

while True:
Expand Down
4 changes: 3 additions & 1 deletion airflow/providers/google/cloud/triggers/dataflow.py
Expand Up @@ -69,7 +69,7 @@ def __init__(
self.cancel_timeout = cancel_timeout

def serialize(self) -> tuple[str, dict[str, Any]]:
"""Serializes class arguments and classpath."""
"""Serialize class arguments and classpath."""
return (
"airflow.providers.google.cloud.triggers.dataflow.TemplateJobStartTrigger",
{
Expand All @@ -85,6 +85,8 @@ def serialize(self) -> tuple[str, dict[str, Any]]:

async def run(self):
"""
Fetch job status or yield certain Events.
Main loop of the class in where it is fetching the job status and yields certain Event.
If the job has status success then it yields TriggerEvent with success status, if job has
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/google/cloud/triggers/datafusion.py
Expand Up @@ -71,7 +71,7 @@ def __init__(
self.success_states = success_states

def serialize(self) -> tuple[str, dict[str, Any]]:
"""Serializes DataFusionStartPipelineTrigger arguments and classpath."""
"""Serialize DataFusionStartPipelineTrigger arguments and classpath."""
return (
"airflow.providers.google.cloud.triggers.datafusion.DataFusionStartPipelineTrigger",
{
Expand All @@ -86,7 +86,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
)

async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
"""Gets current pipeline status and yields a TriggerEvent."""
"""Get current pipeline status and yields a TriggerEvent."""
hook = self._get_async_hook()
try:
while True:
Expand Down
4 changes: 2 additions & 2 deletions airflow/providers/google/cloud/triggers/dataplex.py
Expand Up @@ -105,7 +105,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
yield TriggerEvent({"job_id": self.job_id, "job_state": state, "job": self._convert_to_dict(job)})

def _convert_to_dict(self, job: DataScanJob) -> dict:
"""Returns a representation of a DataScanJob instance as a dict."""
"""Return a representation of a DataScanJob instance as a dict."""
return DataScanJob.to_dict(job)


Expand Down Expand Up @@ -187,5 +187,5 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
yield TriggerEvent({"job_id": self.job_id, "job_state": state, "job": self._convert_to_dict(job)})

def _convert_to_dict(self, job: DataScanJob) -> dict:
"""Returns a representation of a DataScanJob instance as a dict."""
"""Return a representation of a DataScanJob instance as a dict."""
return DataScanJob.to_dict(job)

0 comments on commit 0a8e771

Please sign in to comment.