-
Notifications
You must be signed in to change notification settings - Fork 13.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add valid spark on k8s airflow connection urls to tests #31376
Conversation
airflow/models/connection.py
Outdated
uri = [e for e in uri] | ||
# replace back :__ with :// | ||
uri[1] = uri[1].replace(":__", "://") | ||
uri = SplitResult(uri[0], uri[1], uri[2], uri[3], uri[4]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need to do this, uri
is already a SplitResult
instance…?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your comment. No, one line 200 we are doing a list comprehension. This is leading to the type being as list
.
Verified by adding a few log statements:
def split_uri_to_parts(uri):
c = uri.count("://")
if c < 2:
return urlsplit(uri), False
idx = uri.rfind("://")
# replace :// with :__
uri_list = list(uri)
uri_list[idx : idx + 3] = ":__"
uri = "".join(uri_list)
uri = urlsplit(uri)
uri = [e for e in uri]
# replace back :__ with ://
uri[1] = uri[1].replace(":__", "://")
print(type(uri))
uri = SplitResult(uri[0], uri[1], uri[2], uri[3], uri[4])
print(type(uri))
return uri, True
Output is:
<class 'list'>
<class 'urllib.parse.SplitResult'>
(SplitResult(scheme='spark', netloc='k8s://100.68.0.1:443', path='', query='deploy-mode=cluster', fragment=''), True)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And why do you need to do that list comprehention in the first place? SplitResult supports index access out of the box.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello, I am trying to re-assign a variable to SplitResult list. That list does not support reassignment. Hence to achieve something like that, I had to convert it to a list, re-assign the elements and then convert it back to the SplitResult list. @uranusjr
/Users/adesai/.pyenv/versions/airflow-env/bin/python /Users/adesai/Documents/OpenSource/Airflow/airflow/airflow/models/test.py
Traceback (most recent call last):
File "/Users/adesai/Documents/OpenSource/Airflow/airflow/airflow/models/test.py", line 24, in <module>
parts = split_uri_to_parts("spark://k8s://100.68.0.1:443?deploy-mode=cluster")
File "/Users/adesai/Documents/OpenSource/Airflow/airflow/airflow/models/test.py", line 15, in split_uri_to_parts
uri[1] = uri[1].replace(":__", "://")
TypeError: 'SplitResult' object does not support item assignment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should use _replace
. See documentation on namedtuple.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you! Checked it out and it is brilliant. Pushing a commit with the fix.
This is not a valid URI to begin with. You should escape the second |
Oh I wasn't aware of that. Made the changes accordingly |
@uranusjr can you take a look at this PR once you have some time? I have addressed your review comments. |
airflow/models/connection.py
Outdated
@@ -186,16 +186,43 @@ def _normalize_conn_type(conn_type): | |||
conn_type = conn_type.replace("-", "_") | |||
return conn_type | |||
|
|||
@staticmethod | |||
def split_uri_to_parts(uri): | |||
c = uri.count("://") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it ever possible for this to return >=2 if the escape syntax is used…? A valid URL can never contain more than one ://
sequence.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, since spark://k8s://100.68.0.1:443?deploy-mode=cluster
is not a valid URL as you mentioned earlier. I think the logic in this PR is to have a fallback mechanism in case such a scenario occurs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In normal cases where the format is fine, the logic minus what this PR adds seems to work fine. I think we can do just by adding the cases we have in test_connection.py
as far as this PR is concerned. What do you think @uranusjr ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah makes sense. Also could be a good idea to add something in the Connection form to emit an error if the user puts in an invalud URL. Probably cleaner to do in a separate PR though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@uranusjr do you mean on the UI? I am not sure if we have a check for that, it would be great to do that though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as this PR is concerned, I will undo all the changes and just add the test cases, that way we have confidence that the actual code for this works.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can be done in ConnectionForm
(see WTForms documentation to add validators). We should probably do that on the host
field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice. Let me come up with a separate issue and PR for that :)
@uranusjr re worked the PR and added valid examples for the same. I have also changed the PR title. |
This PR marked as closing for #21638 but it changes only the test. |
@amoghrajesh I'll take a look to see how this bug can be fixed |
I can work with you on that, this PR should be closed as not closing the issue which is linked. |
that is OK just don't mark the PR as closing the issue since it does not |
Removed that portion from the description |
@amoghrajesh, I'm currently working on the fix. Let's wait before merging this PR to see if we need to add more tests. |
@hussein-awala a related issue that might be interesting or help you with the fix: #31460 |
@hussein-awala can we merge this one in after we merge the dependent PR? |
@hussein-awala can we merge this PR too as the dependent PR has been merged recently? |
--------- Co-authored-by: Amogh <adesai@cloudera.com>
Airflow connections parses valid urls fine. The issue described in #21638 provided a wrong url as input to the connections with double
://
leading to parsing issues. This PR simply adds a few valid spark on k8s connection urls to the existing test suite.Testing by adding various different unit test cases to cover the breadth
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.