Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion airflow/contrib/hooks/gcs_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,23 @@
import warnings

# pylint: disable=unused-import
from airflow.gcp.hooks.gcs import GoogleCloudStorageHook, _parse_gcs_url # noqa
from airflow.gcp.hooks.gcs import GcsHook, _parse_gcs_url # noqa

warnings.warn(
"This module is deprecated. Please use `airflow.gcp.hooks.gcs`.",
DeprecationWarning, stacklevel=2
)


class GoogleCloudStorageHook(GcsHook):
"""
This class is deprecated. Please use `airflow.gcp.hooks.gcs.GcsHook`.
"""

def __init__(self, *args, **kwargs):
warnings.warn(
"This class is deprecated. Please use `airflow.gcp.hooks.gcs.GcsHook`.",
DeprecationWarning, stacklevel=2
)

super().__init__(*args, **kwargs)
6 changes: 3 additions & 3 deletions airflow/contrib/operators/gcs_to_gdrive_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from airflow.contrib.hooks.gdrive_hook import GoogleDriveHook
from airflow.exceptions import AirflowException
from airflow.gcp.hooks.gcs import GoogleCloudStorageHook
from airflow.gcp.hooks.gcs import GcsHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults

Expand Down Expand Up @@ -93,12 +93,12 @@ def __init__(
self.move_object = move_object
self.gcp_conn_id = gcp_conn_id
self.delegate_to = delegate_to
self.gcs_hook = None # type: Optional[GoogleCloudStorageHook]
self.gcs_hook = None # type: Optional[GcsHook]
self.gdrive_hook = None # type: Optional[GoogleDriveHook]

def execute(self, context):

self.gcs_hook = GoogleCloudStorageHook(
self.gcs_hook = GcsHook(
google_cloud_storage_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to
)
self.gdrive_hook = GoogleDriveHook(gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to)
Expand Down
4 changes: 2 additions & 2 deletions airflow/contrib/operators/s3_to_gcs_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from airflow.contrib.operators.s3_list_operator import S3ListOperator
from airflow.exceptions import AirflowException
from airflow.gcp.hooks.gcs import GoogleCloudStorageHook, _parse_gcs_url
from airflow.gcp.hooks.gcs import GcsHook, _parse_gcs_url
from airflow.hooks.S3_hook import S3Hook
from airflow.utils.decorators import apply_defaults

Expand Down Expand Up @@ -140,7 +140,7 @@ def execute(self, context):
# use the super method to list all the files in an S3 bucket/key
files = super().execute(context)

gcs_hook = GoogleCloudStorageHook(
gcs_hook = GcsHook(
google_cloud_storage_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to)

Expand Down
2 changes: 1 addition & 1 deletion airflow/gcp/hooks/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from airflow.version import version


class GoogleCloudStorageHook(GoogleCloudBaseHook):
class GcsHook(GoogleCloudBaseHook):
"""
Interact with Google Cloud Storage. This hook uses the Google Cloud Platform
connection.
Expand Down
6 changes: 3 additions & 3 deletions airflow/gcp/operators/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

from airflow.exceptions import AirflowException
from airflow.gcp.hooks.bigquery import BigQueryHook
from airflow.gcp.hooks.gcs import GoogleCloudStorageHook, _parse_gcs_url
from airflow.gcp.hooks.gcs import GcsHook, _parse_gcs_url
from airflow.models.baseoperator import BaseOperator, BaseOperatorLink
from airflow.models.taskinstance import TaskInstance
from airflow.operators.check_operator import CheckOperator, IntervalCheckOperator, ValueCheckOperator
Expand Down Expand Up @@ -734,7 +734,7 @@ def execute(self, context):

gcs_bucket, gcs_object = _parse_gcs_url(self.gcs_schema_object)

gcs_hook = GoogleCloudStorageHook(
gcs_hook = GcsHook(
google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
delegate_to=self.delegate_to)
schema_fields = json.loads(gcs_hook.download(
Expand Down Expand Up @@ -903,7 +903,7 @@ def execute(self, context):
location=self.location)

if not self.schema_fields and self.schema_object and self.source_format != 'DATASTORE_BACKUP':
gcs_hook = GoogleCloudStorageHook(
gcs_hook = GcsHook(
google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
delegate_to=self.delegate_to)
schema_fields = json.loads(gcs_hook.download(
Expand Down
6 changes: 3 additions & 3 deletions airflow/gcp/operators/dataflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from typing import List, Optional

from airflow.gcp.hooks.dataflow import DataFlowHook
from airflow.gcp.hooks.gcs import GoogleCloudStorageHook
from airflow.gcp.hooks.gcs import GcsHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.version import version
Expand Down Expand Up @@ -422,13 +422,13 @@ def execute(self, context):


class GoogleCloudBucketHelper:
"""GoogleCloudStorageHook helper class to download GCS object."""
"""GcsHook helper class to download GCS object."""
GCS_PREFIX_LENGTH = 5

def __init__(self,
gcp_conn_id: str = 'google_cloud_default',
delegate_to: Optional[str] = None) -> None:
self._gcs_hook = GoogleCloudStorageHook(gcp_conn_id, delegate_to)
self._gcs_hook = GcsHook(gcp_conn_id, delegate_to)

def google_cloud_to_local(self, file_name: str) -> str:
"""
Expand Down
4 changes: 2 additions & 2 deletions airflow/gcp/operators/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

from airflow.exceptions import AirflowException
from airflow.gcp.hooks.dataproc import DataProcHook
from airflow.gcp.hooks.gcs import GoogleCloudStorageHook
from airflow.gcp.hooks.gcs import GcsHook
from airflow.models import BaseOperator
from airflow.utils import timezone
from airflow.utils.decorators import apply_defaults
Expand Down Expand Up @@ -1105,7 +1105,7 @@ def _upload_file_temp(self, bucket, local_file):

self.log.info("Uploading %s to %s", local_file, temp_filename)

GoogleCloudStorageHook(
GcsHook(
google_cloud_storage_conn_id=self.gcp_conn_id
).upload(
bucket_name=bucket,
Expand Down
4 changes: 2 additions & 2 deletions airflow/gcp/operators/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from airflow.exceptions import AirflowException
from airflow.gcp.hooks.datastore import DatastoreHook
from airflow.gcp.hooks.gcs import GoogleCloudStorageHook
from airflow.gcp.hooks.gcs import GcsHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults

Expand Down Expand Up @@ -94,7 +94,7 @@ def execute(self, context):
self.log.info('Exporting data to Cloud Storage bucket %s', self.bucket)

if self.overwrite_existing and self.namespace:
gcs_hook = GoogleCloudStorageHook(self.cloud_storage_conn_id)
gcs_hook = GcsHook(self.cloud_storage_conn_id)
objects = gcs_hook.list(self.bucket, prefix=self.namespace)
for obj in objects:
gcs_hook.delete(self.bucket, obj)
Expand Down
16 changes: 8 additions & 8 deletions airflow/gcp/operators/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from typing import Dict, Iterable, List, Optional, Union

from airflow import AirflowException
from airflow.gcp.hooks.gcs import GoogleCloudStorageHook
from airflow.gcp.hooks.gcs import GcsHook
from airflow.models import BaseOperator
from airflow.models.xcom import MAX_XCOM_SIZE
from airflow.utils.decorators import apply_defaults
Expand Down Expand Up @@ -130,7 +130,7 @@ def __init__(self,
self.delegate_to = delegate_to

def execute(self, context):
hook = GoogleCloudStorageHook(
hook = GcsHook(
google_cloud_storage_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to
)
Expand Down Expand Up @@ -211,7 +211,7 @@ def __init__(self,

def execute(self, context):

hook = GoogleCloudStorageHook(
hook = GcsHook(
google_cloud_storage_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to
)
Expand Down Expand Up @@ -300,7 +300,7 @@ def __init__(self,
def execute(self, context):
self.log.info('Executing download: %s, %s, %s', self.bucket,
self.object, self.filename)
hook = GoogleCloudStorageHook(
hook = GcsHook(
google_cloud_storage_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to
)
Expand Down Expand Up @@ -373,7 +373,7 @@ def __init__(self,
super().__init__(*args, **kwargs)

def execute(self, context):
hook = GoogleCloudStorageHook(
hook = GcsHook(
google_cloud_storage_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to
)
Expand Down Expand Up @@ -448,7 +448,7 @@ def __init__(
self.gcp_conn_id = gcp_conn_id

def execute(self, context):
hook = GoogleCloudStorageHook(
hook = GcsHook(
google_cloud_storage_conn_id=self.gcp_conn_id
)
hook.insert_bucket_acl(bucket_name=self.bucket, entity=self.entity, role=self.role,
Expand Down Expand Up @@ -519,7 +519,7 @@ def __init__(self,
self.gcp_conn_id = gcp_conn_id

def execute(self, context):
hook = GoogleCloudStorageHook(
hook = GcsHook(
google_cloud_storage_conn_id=self.gcp_conn_id
)
hook.insert_object_acl(bucket_name=self.bucket,
Expand Down Expand Up @@ -580,7 +580,7 @@ def __init__(
self.output_encoding = sys.getdefaultencoding()

def execute(self, context: Dict):
hook = GoogleCloudStorageHook(gcp_conn_id=self.gcp_conn_id)
hook = GcsHook(gcp_conn_id=self.gcp_conn_id)

with NamedTemporaryFile() as source_file, NamedTemporaryFile() as destination_file:
self.log.info("Downloading file from %s", self.source_bucket)
Expand Down
4 changes: 2 additions & 2 deletions airflow/gcp/operators/text_to_speech.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from google.cloud.texttospeech_v1.types import AudioConfig, SynthesisInput, VoiceSelectionParams

from airflow import AirflowException
from airflow.gcp.hooks.gcs import GoogleCloudStorageHook
from airflow.gcp.hooks.gcs import GcsHook
from airflow.gcp.hooks.text_to_speech import GCPTextToSpeechHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
Expand Down Expand Up @@ -129,7 +129,7 @@ def execute(self, context):
)
with NamedTemporaryFile() as temp_file:
temp_file.write(result.audio_content)
cloud_storage_hook = GoogleCloudStorageHook(google_cloud_storage_conn_id=self.gcp_conn_id)
cloud_storage_hook = GcsHook(google_cloud_storage_conn_id=self.gcp_conn_id)
cloud_storage_hook.upload(
bucket_name=self.target_bucket_name, object_name=self.target_filename, filename=temp_file.name
)
10 changes: 5 additions & 5 deletions airflow/gcp/sensors/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from typing import Callable, List, Optional

from airflow import AirflowException
from airflow.gcp.hooks.gcs import GoogleCloudStorageHook
from airflow.gcp.hooks.gcs import GcsHook
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults

Expand Down Expand Up @@ -66,7 +66,7 @@ def __init__(self,

def poke(self, context):
self.log.info('Sensor checks existence of : %s, %s', self.bucket, self.object)
hook = GoogleCloudStorageHook(
hook = GcsHook(
google_cloud_storage_conn_id=self.google_cloud_conn_id,
delegate_to=self.delegate_to)
return hook.exists(self.bucket, self.object)
Expand Down Expand Up @@ -123,7 +123,7 @@ def __init__(self,

def poke(self, context):
self.log.info('Sensor checks existence of : %s, %s', self.bucket, self.object)
hook = GoogleCloudStorageHook(
hook = GcsHook(
google_cloud_storage_conn_id=self.google_cloud_conn_id,
delegate_to=self.delegate_to)
return hook.is_updated_after(self.bucket, self.object, self.ts_func(context))
Expand Down Expand Up @@ -170,7 +170,7 @@ def __init__(self,
def poke(self, context):
self.log.info('Sensor checks existence of objects: %s, %s',
self.bucket, self.prefix)
hook = GoogleCloudStorageHook(
hook = GcsHook(
google_cloud_storage_conn_id=self.google_cloud_conn_id,
delegate_to=self.delegate_to)
self._matches = hook.list(self.bucket, prefix=self.prefix)
Expand Down Expand Up @@ -315,5 +315,5 @@ def is_bucket_updated(self, current_num_objects: int) -> bool:
return False

def poke(self, context):
hook = GoogleCloudStorageHook()
hook = GcsHook()
return self.is_bucket_updated(len(hook.list(self.bucket, prefix=self.prefix)))
4 changes: 2 additions & 2 deletions airflow/gcp/utils/mlengine_operator_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import dill

from airflow.exceptions import AirflowException
from airflow.gcp.hooks.gcs import GoogleCloudStorageHook
from airflow.gcp.hooks.gcs import GcsHook
from airflow.gcp.operators.dataflow import DataFlowPythonOperator
from airflow.gcp.operators.mlengine import MLEngineBatchPredictionOperator
from airflow.operators.python_operator import PythonOperator
Expand Down Expand Up @@ -244,7 +244,7 @@ def apply_validate_fn(*args, **kwargs):
raise ValueError("Wrong format prediction_path: {}".format(prediction_path))
summary = os.path.join(obj.strip("/"),
"prediction.summary.json")
gcs_hook = GoogleCloudStorageHook()
gcs_hook = GcsHook()
summary = json.loads(gcs_hook.download(bucket, summary))
return validate_fn(summary)

Expand Down
4 changes: 2 additions & 2 deletions airflow/operators/adls_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

from airflow.contrib.hooks.azure_data_lake_hook import AzureDataLakeHook
from airflow.contrib.operators.adls_list_operator import AzureDataLakeStorageListOperator
from airflow.gcp.hooks.gcs import GoogleCloudStorageHook, _parse_gcs_url
from airflow.gcp.hooks.gcs import GcsHook, _parse_gcs_url
from airflow.utils.decorators import apply_defaults


Expand Down Expand Up @@ -134,7 +134,7 @@ def __init__(self,
def execute(self, context):
# use the super to list all files in an Azure Data Lake path
files = super().execute(context)
g_hook = GoogleCloudStorageHook(
g_hook = GcsHook(
google_cloud_storage_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to)

Expand Down
4 changes: 2 additions & 2 deletions airflow/operators/cassandra_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

from airflow.contrib.hooks.cassandra_hook import CassandraHook
from airflow.exceptions import AirflowException
from airflow.gcp.hooks.gcs import GoogleCloudStorageHook
from airflow.gcp.hooks.gcs import GcsHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults

Expand Down Expand Up @@ -217,7 +217,7 @@ def _write_local_schema_file(self, cursor):
return {self.schema_filename: tmp_schema_file_handle}

def _upload_to_gcs(self, files_to_upload):
hook = GoogleCloudStorageHook(
hook = GcsHook(
google_cloud_storage_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to)
for object, tmp_file_handle in files_to_upload.items():
Expand Down
4 changes: 2 additions & 2 deletions airflow/operators/gcs_to_bq.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from airflow import AirflowException
from airflow.gcp.hooks.bigquery import BigQueryHook
from airflow.gcp.hooks.gcs import GoogleCloudStorageHook
from airflow.gcp.hooks.gcs import GcsHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults

Expand Down Expand Up @@ -238,7 +238,7 @@ def execute(self, context):

if not self.schema_fields:
if self.schema_object and self.source_format != 'DATASTORE_BACKUP':
gcs_hook = GoogleCloudStorageHook(
gcs_hook = GcsHook(
google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
delegate_to=self.delegate_to)
schema_fields = json.loads(gcs_hook.download(
Expand Down
6 changes: 3 additions & 3 deletions airflow/operators/gcs_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from typing import Optional

from airflow.exceptions import AirflowException
from airflow.gcp.hooks.gcs import GoogleCloudStorageHook
from airflow.gcp.hooks.gcs import GcsHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults

Expand Down Expand Up @@ -156,7 +156,7 @@ def __init__(self,

def execute(self, context):

hook = GoogleCloudStorageHook(
hook = GcsHook(
google_cloud_storage_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to
)
Expand Down Expand Up @@ -284,7 +284,7 @@ def __init__(
self.delegate_to = delegate_to

def execute(self, context):
hook = GoogleCloudStorageHook(
hook = GcsHook(
google_cloud_storage_conn_id=self.gcp_conn_id,
delegate_to=self.delegate_to
)
Expand Down
Loading