Skip to content

Commit

Permalink
Revert #27878 (#27968) (#27970)
Browse files Browse the repository at this point in the history
* Merge in master

* Remove changes.md callout
  • Loading branch information
damccorm committed Aug 11, 2023
1 parent 8caa7ce commit 50e2495
Show file tree
Hide file tree
Showing 27 changed files with 5,758 additions and 537 deletions.
1 change: 0 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@

## I/Os

* Python GCSIO is now implemented with GCP GCS Client instead of apitools ([#25676](https://github.com/apache/beam/issues/25676))
* Java KafkaIO now supports picking up topics via topicPattern ([#26948](https://github.com/apache/beam/pull/26948))
* Support for read from Cosmos DB Core SQL API ([#23604](https://github.com/apache/beam/issues/23604))
* Upgraded to HBase 2.5.5 for HBaseIO. (Java) ([#27711](https://github.com/apache/beam/issues/19554))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4790,6 +4790,10 @@ GBKTransform:
- from_runner_api_parameter
- to_runner_api_parameter
GcpTestIOError: {}
GcsDownloader:
methods:
- get_range
- size
GCSFileSystem:
methods:
- checksum
Expand Down Expand Up @@ -4833,6 +4837,10 @@ GcsIOError: {}
GcsIOOverrides:
methods:
- retry_func
GcsUploader:
methods:
- finish
- put
GeneralPurposeConsumerSet:
methods:
- flush
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ def format_user_score_sums(user_score):
(user, score) = user_score
return 'user: %s, total_score: %s' % (user, score)


( # pylint: disable=expression-not-assigned
p
| 'ReadInputText' >> beam.io.ReadFromText(args.input)
Expand Down
7 changes: 2 additions & 5 deletions sdks/python/apache_beam/internal/gcp/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,6 @@ def __getattr__(self, attr):
"""Delegate attribute access to underlying google-auth credentials."""
return getattr(self._google_auth_credentials, attr)

def get_google_auth_credentials(self):
return self._google_auth_credentials


class _Credentials(object):
_credentials_lock = threading.Lock()
Expand All @@ -122,7 +119,7 @@ class _Credentials(object):

@classmethod
def get_service_credentials(cls, pipeline_options):
# type: (PipelineOptions) -> Optional[_ApitoolsCredentialsAdapter]
# type: (PipelineOptions) -> Optional[google.auth.credentials.Credentials]
with cls._credentials_lock:
if cls._credentials_init:
return cls._credentials
Expand All @@ -142,7 +139,7 @@ def get_service_credentials(cls, pipeline_options):

@staticmethod
def _get_service_credentials(pipeline_options):
# type: (PipelineOptions) -> Optional[_ApitoolsCredentialsAdapter]
# type: (PipelineOptions) -> Optional[google.auth.credentials.Credentials]
if not _GOOGLE_AUTH_AVAILABLE:
_LOGGER.warning(
'Unable to find default credentials because the google-auth library '
Expand Down
4 changes: 1 addition & 3 deletions sdks/python/apache_beam/io/gcp/bigquery_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,6 @@ def test_streaming_inserts_flush_on_byte_size_limit(self, mock_insert):
exception_type=exceptions.ServiceUnavailable if exceptions else None,
error_message='backendError')
])
@unittest.skip('Not compatible with new GCS client. See GH issue #26334.')
def test_load_job_exception(self, exception_type, error_message):

with mock.patch.object(bigquery_v2_client.BigqueryV2.JobsService,
Expand Down Expand Up @@ -867,7 +866,6 @@ def test_load_job_exception(self, exception_type, error_message):
exception_type=exceptions.InternalServerError if exceptions else None,
error_message='internalError'),
])
@unittest.skip('Not compatible with new GCS client. See GH issue #26334.')
def test_copy_load_job_exception(self, exception_type, error_message):

from apache_beam.io.gcp import bigquery_file_loads
Expand All @@ -886,7 +884,7 @@ def test_copy_load_job_exception(self, exception_type, error_message):
mock.patch.object(BigQueryWrapper,
'wait_for_bq_job'), \
mock.patch('apache_beam.io.gcp.internal.clients'
'.storage.storage_v1_client.StorageV1.ObjectsService'),\
'.storage.storage_v1_client.StorageV1.ObjectsService'), \
mock.patch('time.sleep'), \
self.assertRaises(Exception) as exc, \
beam.Pipeline() as p:
Expand Down
4 changes: 0 additions & 4 deletions sdks/python/apache_beam/io/gcp/bigquery_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,9 +339,6 @@ class BigQueryWrapper(object):
offer a common place where retry logic for failures can be controlled.
In addition, it offers various functions used both in sources and sinks
(e.g., find and create tables, query a table, etc.).
Note that client parameter in constructor is only for testing purposes and
should not be used in production code.
"""

# If updating following names, also update the corresponding pydocs in
Expand All @@ -356,7 +353,6 @@ def __init__(self, client=None, temp_dataset_id=None, temp_table_ref=None):
self.gcp_bq_client = client or gcp_bigquery.Client(
client_info=ClientInfo(
user_agent="apache-beam-%s" % apache_beam.__version__))

self._unique_row_id = 0
# For testing scenarios where we pass in a client we do not want a
# randomized prefix for row IDs.
Expand Down
18 changes: 18 additions & 0 deletions sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
from apitools.base.py.exceptions import HttpError, HttpForbiddenError
from google.api_core.exceptions import ClientError, DeadlineExceeded
from google.api_core.exceptions import InternalServerError
import google.cloud
except ImportError:
ClientError = None
DeadlineExceeded = None
Expand Down Expand Up @@ -223,6 +224,23 @@ def test_delete_dataset_retries_for_timeouts(self, patched_time_sleep):
wrapper._delete_dataset('', '')
self.assertTrue(client.datasets.Delete.called)

@unittest.skipIf(
google and not hasattr(google.cloud, '_http'), # pylint: disable=c-extension-no-member
'Dependencies not installed')
@mock.patch('time.sleep', return_value=None)
@mock.patch('google.cloud._http.JSONConnection.http')
def test_user_agent_insert_all(self, http_mock, patched_sleep):
wrapper = beam.io.gcp.bigquery_tools.BigQueryWrapper()
try:
wrapper._insert_all_rows('p', 'd', 't', [{'name': 'any'}], None)
except: # pylint: disable=bare-except
# Ignore errors. The errors come from the fact that we did not mock
# the response from the API, so the overall insert_all_rows call fails
# soon after the BQ API is called.
pass
call = http_mock.request.mock_calls[-2]
self.assertIn('apache-beam-', call[2]['headers']['User-Agent'])

@mock.patch('time.sleep', return_value=None)
def test_delete_table_retries_for_timeouts(self, patched_time_sleep):
client = mock.Mock()
Expand Down
33 changes: 25 additions & 8 deletions sdks/python/apache_beam/io/gcp/gcsfilesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,13 +254,24 @@ def rename(self, source_file_names, destination_file_names):
gcs_batches.append(gcs_current_batch)

# Execute GCS renames if any and return exceptions.
try:
for batch in gcs_batches:
self._gcsIO().copy_batch(batch)
self._gcsIO().delete_batch(source_file_names)
exceptions = {}
for batch in gcs_batches:
copy_statuses = self._gcsIO().copy_batch(batch)
copy_succeeded = []
for src, dest, exception in copy_statuses:
if exception:
exceptions[(src, dest)] = exception
else:
copy_succeeded.append((src, dest))
delete_batch = [src for src, dest in copy_succeeded]
delete_statuses = self._gcsIO().delete_batch(delete_batch)
for i, (src, exception) in enumerate(delete_statuses):
dest = copy_succeeded[i][1]
if exception:
exceptions[(src, dest)] = exception

except Exception as exception:
raise BeamIOError("Rename operation failed", exception)
if exceptions:
raise BeamIOError("Rename operation failed", exceptions)

def exists(self, path):
"""Check if the provided path exists on the FileSystem.
Expand Down Expand Up @@ -329,7 +340,8 @@ def metadata(self, path):
"""
try:
file_metadata = self._gcsIO()._status(path)
return FileMetadata(path, file_metadata['size'], file_metadata['updated'])
return FileMetadata(
path, file_metadata['size'], file_metadata['last_updated'])
except Exception as e: # pylint: disable=broad-except
raise BeamIOError("Metadata operation failed", {path: e})

Expand All @@ -348,7 +360,12 @@ def _delete_path(path):
else:
path_to_use = path
match_result = self.match([path_to_use])[0]
self._gcsIO().delete_batch([m.path for m in match_result.metadata_list])
statuses = self._gcsIO().delete_batch(
[m.path for m in match_result.metadata_list])
# pylint: disable=used-before-assignment
failures = [e for (_, e) in statuses if e is not None]
if failures:
raise failures[0]

exceptions = {}
for path in paths:
Expand Down
17 changes: 13 additions & 4 deletions sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,16 +272,25 @@ def test_rename_error(self, mock_gcsio):
'gs://bucket/to2',
'gs://bucket/to3',
]
gcsio_mock.delete_batch.side_effect = Exception("BadThings")
exception = IOError('Failed')
gcsio_mock.delete_batch.side_effect = [[(f, exception) for f in sources]]
gcsio_mock.copy_batch.side_effect = [[
('gs://bucket/from1', 'gs://bucket/to1', None),
('gs://bucket/from2', 'gs://bucket/to2', None),
('gs://bucket/from3', 'gs://bucket/to3', None),
]]

# Issue batch rename.
with self.assertRaisesRegex(BeamIOError, r'^Rename operation failed'):
expected_results = {
(s, d): exception
for s, d in zip(sources, destinations)
}

# Issue batch rename.
with self.assertRaisesRegex(BeamIOError,
r'^Rename operation failed') as error:
self.fs.rename(sources, destinations)
self.assertEqual(error.exception.exception_details, expected_results)

gcsio_mock.copy_batch.assert_called_once_with([
('gs://bucket/from1', 'gs://bucket/to1'),
Expand All @@ -299,7 +308,7 @@ def test_delete(self, mock_gcsio):
# Prepare mocks.
gcsio_mock = mock.MagicMock()
gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
gcsio_mock._status.return_value = {'size': 0, 'updated': 99999.0}
gcsio_mock._status.return_value = {'size': 0, 'last_updated': 99999.0}
files = [
'gs://bucket/from1',
'gs://bucket/from2',
Expand All @@ -317,7 +326,7 @@ def test_delete_error(self, mock_gcsio):
gcsfilesystem.gcsio.GcsIO = lambda pipeline_options=None: gcsio_mock
exception = IOError('Failed')
gcsio_mock.delete_batch.side_effect = exception
gcsio_mock._status.return_value = {'size': 0, 'updated': 99999.0}
gcsio_mock._status.return_value = {'size': 0, 'last_updated': 99999.0}
files = [
'gs://bucket/from1',
'gs://bucket/from2',
Expand Down
Loading

0 comments on commit 50e2495

Please sign in to comment.