-
Notifications
You must be signed in to change notification settings - Fork 14.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix four bugs in StackdriverTaskHandler #13784
Conversation
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
@@ -188,7 +193,7 @@ def read( | |||
if next_page_token: | |||
new_metadata['next_page_token'] = next_page_token | |||
|
|||
return [messages], [new_metadata] | |||
return [((self.task_instance_hostname, messages),)], [new_metadata] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed... interesting way of reurning data :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed... List[Tuple[Tuple[str, List]]]
😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tuple[List[Tuple[Tuple[str, str]]], List[Dict[str, str]]]
The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest master at your convenience, or amend the last commit of the PR, and push it with --force-with-lease. |
@@ -35,10 +35,21 @@ def _create_list_response(messages, token): | |||
return mock.MagicMock(pages=(n for n in [page]), next_page_token=token) | |||
|
|||
|
|||
def _remove_stackdriver_handlers(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is another small fix. An error is generated when exiting the process, but it did not cause any errors, but only noise in the log.
========================================== 9 failed, 8 passed, 3 skipped, 7 errors in 12.91s ==========================================
[2021-01-20 19:52:29,766] {_metadata.py:104} WARNING - Compute Engine Metadata server unavailable onattempt 1 of 3. Reason: timed out
[2021-01-20 19:52:29,769] {_metadata.py:104} WARNING - Compute Engine Metadata server unavailable onattempt 2 of 3. Reason: [Errno 111] Connection refused
[2021-01-20 19:52:29,774] {_metadata.py:104} WARNING - Compute Engine Metadata server unavailable onattempt 3 of 3. Reason: [Errno 111] Connection refused
[2021-01-20 19:52:29,774] {_default.py:246} WARNING - Authentication failed using Compute Engine authentication due to unavailable metadata server.
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/logging/__init__.py", line 1946, in shutdown
h.close()
File "/opt/airflow/airflow/providers/google/cloud/log/stackdriver_task_handler.py", line 345, in close
self._transport.flush()
File "/usr/local/lib/python3.6/site-packages/cached_property.py", line 36, in __get__
value = obj.__dict__[self.func.__name__] = self.func(obj)
File "/opt/airflow/airflow/providers/google/cloud/log/stackdriver_task_handler.py", line 120, in _transport
return self.transport_type(self._client, self.name)
File "/usr/local/lib/python3.6/site-packages/cached_property.py", line 36, in __get__
value = obj.__dict__[self.func.__name__] = self.func(obj)
File "/opt/airflow/airflow/providers/google/cloud/log/stackdriver_task_handler.py", line 108, in _client
key_path=self.gcp_key_path, scopes=self.scopes, disable_logging=True
File "/opt/airflow/airflow/providers/google/cloud/utils/credentials_provider.py", line 309, in get_credentials_and_project_id
return _CredentialProvider(*args, **kwargs).get_credentials_and_project()
File "/opt/airflow/airflow/providers/google/cloud/utils/credentials_provider.py", line 242, in get_credentials_and_project
credentials, project_id = self._get_credentials_using_adc()
File "/opt/airflow/airflow/providers/google/cloud/utils/credentials_provider.py", line 295, in _get_credentials_using_adc
credentials, project_id = google.auth.default(scopes=self.scopes)
File "/usr/local/lib/python3.6/site-packages/google/auth/_default.py", line 356, in default
raise exceptions.DefaultCredentialsError(_HELP_MESSAGE)
google.auth.exceptions.DefaultCredentialsError: Could not automatically determine credentials. Please set GOOGLE_APPLICATION_CREDENTIALS or explicitly create credentials and re-run the application. For more information, please see https://cloud.google.com/docs/authentication/getting-started
@@ -252,6 +256,8 @@ def _read_logs( | |||
log_filter=log_filter, page_token=next_page_token | |||
) | |||
messages.append(new_messages) | |||
if not messages: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stackdriver sometimes falls into an endless loop of blank pages.
f28c55f
to
0944b98
Compare
@@ -129,6 +130,8 @@ def test_should_read_logging_configuration(self): | |||
assert "stackdriver" in text | |||
|
|||
def tearDown(self) -> None: | |||
for handler_ref in logging._handlerList[:]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this test, we used the StackdriverTaskHandler which tries to connect to GCP in the close()
method. To avoid this, I delete the handlers manually without calling this method. Similar to: https://github.com/apache/airflow/pull/13784/files#r561283952
Unfortunately this organization is in a bit more serious condition and I found a few bugs that prevented the use of this integration.
The commit ac943c9#diff-e7f34f73940eb52d92bb991abedc1c963431c5373c12dff739c8fb7d03e93d3aR181 changed the output type for the
read
method. Unfortunately, this change did not update this handler as well, so the attempt to read the entries was unsuccessful.The flush method was missing, with the result that at times one or two of the last entries were not saved. It is worth adding that the official implementation of Stackdriver Logging Handler does not have this method either. https://github.com/googleapis/python-logging/blob/master/google/cloud/logging_v2/handlers/handlers.py
I also changed the parameters for calling the
entries.list
method, which I have the impression that they work better. They follow the defaults forgcloud
.Close: #13494
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.