-
Notifications
You must be signed in to change notification settings - Fork 14.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
updated _write_args on PythonVirtualenvOperator #8256
Conversation
airflow/operators/python_operator.py
Outdated
else: | ||
serializer = pickle | ||
# some args from context can't be loaded in virtual env | ||
invalid_args = set(['dag', 'task', 'ti']) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is it not possible to load ti
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TI is a SQL Alchemy model. It is very likely that it contains references to another object through thetask
attribute.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks mik-laj. This means that xcoms are still not working for PythonVirtualenvOperators, right? Can I inject the system site-packages into the virtualenv to get it working?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is slightly problematic. Technically this is possible, but you would have to create the TaskInstance object from scratch. This can be a simple code that will only retrieve the object using session.query(TaskInstance)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, it's tricky. Thanks for the detailed answers. I'll try to retrieve the TaskInstance or Xcom directly as a workaround.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jloehel We have not encountered any cases where to use Xcom, so I'm not familiar with it. Let me know if you want me to make any modification to this code, or you'll be making changes on your side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I am using it wrong but I don't get the idea why it is an anti-pattern to exchange data between tasks. I am using this feature really often. I will make the changes on my side but I would love to see your PR merged soon. Thanks a lot for your work :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jloehel I don't mind if the PR doesn't merge, as long as PythonVirtualenvOp gets fixed, I think that will help a lot people. Feel free to send me an e-mail if you would like to follow up in case I need to modify that function again. Should be in my profile. Cheers :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This helped me thanks @maganaluis
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
XCom isn't likely to be a problem, as you don't pass XCom rows/objects around, but instead the value stored inside that row.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like a good fix. I would have expected to have more people would have encountered this issue. A couple of things:
- I don't think [AIRFLOW-5390] Remove provide context #6074 had been cherry-picked back to the 1.10 branch.
- I've been a bit out of the loop, are we still using the
v1-10-test
branch? - Maybe add a test? This will make sure that it won't happen in the future.
airflow/operators/python_operator.py
Outdated
for key, value in self.op_kwargs.items(): | ||
try: | ||
serializer.loads(serializer.dumps(value)) | ||
if key not in invalid_args: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why skip the invalid arguments here? It would make more sense to me to not serialize the set above at all.
airflow/operators/python_operator.py
Outdated
"Exception %s found while serializing argument | ||
object: %s on op_kwargs key %s ...skipping..." | ||
""" % (e, value, key) | ||
self.log.debug(msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would raise this level. If you try to pass something, and it being dropped silently, that would be very confusing. What do you think? Any idea how often this happens?
airflow/operators/python_operator.py
Outdated
serializer.loads(serializer.dumps(value)) | ||
if key not in invalid_args: | ||
kwargs[key] = value | ||
except Exception as e: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exception
is a bit broad, can we be more specific here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Small issues, apart from that; LGTM
Thank you @Fokko, I switched those to follow a create_session approach with the with statement. |
@maganaluis This only applies to 1.10.x correct, and isn't a problem with the code in master? Is it just because in master we don't pass all arguments, only the ones that the caller asks for? |
with create_session() as session: | ||
session.query(DagRun).delete() | ||
session.query(TI).delete() | ||
session.commit() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For consistency with other tests please use the helpers from https://github.com/apache/airflow/blob/v1-10-stable/tests/test_utils/db.py
t = PythonVirtualenvOperator(task_id='config_dag_run', dag=self.dag, | ||
provide_context=True, | ||
python_callable=pass_function) | ||
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does doing this create a whole virtual environment? That takes quite a while doesn't it?
Could we perhaps just test the one function (_write_args
) instead if that is the case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test runs at about 2 sec, that's not too slow...
This appears to have not run the main unitests either. Please rebase @maganaluis |
@ashb I squashed the commits into one. Re-base has a different effect, to be honest this is the first time I use this feature but it's great. Regarding master vs v1-10-stable, yes master has a different behavior so I don't believe is an issue there. We'll find out more once we move to 2.0 later this year. |
@feluelle Caa you look at it? This change has been open for a long time and it would be nice to move it forward. |
Hey @maganaluis :) I am currently on refactoring the Best Regards, |
I see you're merging to master so you're taking care of this issue in 2.0, my PR is to solve the issue on 1.10.10 stable. I think both of them can be merged, since 1.10.10 was already released, this will fix the issue in case anyone wants to install directly from the branch. I'll check out your fork, and start testing Airflow from master :) |
…che#7276) If a task is skipped by BranchPythonOperator, BaseBranchOperator or ShortCircuitOperator and the user then clears the skipped task later, it'll execute. This is probably not the right behaviour. This commit changes that so it will be skipped again. This can be ignored by running the task again with "Ignore Task Deps" override. (cherry picked from commit 1cdab56)
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions. |
Hey @maganaluis, I got more requests to make this fix happen in 1.10.x. The code looks good to me in general, but is it possible that you can rebase on the latest v1.10 branch? |
a656fb7
to
d576f6c
Compare
@feluelle Sorry for the late response, should be up to date now. |
(cherry-picked from commit 4aa05a7)
For some reason the |
@feluelle The static checks are failing, I ran them with Breeze but it skips them since there is now files changes locally. Do you know how force Breeze to run the static checks? |
You can manually run it via See: https://github.com/apache/airflow/blob/master/BREEZE.rst#running-static-checks |
Or you soft-reset your last commit and re-run it again. |
I see. @potiuk do you have an idea why the CI can't find Python? master is using |
Please rebase to latest v1-10 branch @maganaluis . There was a bug I fixed recently - cache of pre-commit was shared between master and v1-10 branches and they used different versions of Python . This is supposed to be fixed in the latest branch |
This change will ensure that provide_context=True works on v1.10.10 for PythonVirtualenvOperator switched log message to debug on _write_args switched log message to debug on _write_args Clean up code and simplified Clean up code and simplified the solution to ignore just the unserializable items adding pytest for pythonvirtualenvoperator this test ensures that we can use dag_run from context Switch to test utils Switched the session clear run and clear dags to test util functions.
d576f6c
to
5fd4a7f
Compare
Looks like the static checks and core tests passed, I see some issues on the Kubernetes tests though they seem unrelated. I'm not able to replicate those locally. |
Awesome work, congrats on your first merged pull request! |
…8256) This change will ensure that provide_context=True works on v1.10.12 for PythonVirtualenvOperator Clean up code and simplified the solution to ignore just the unserializable items adding pytest for PythonVirtualenvOperator
…8256) This change will ensure that provide_context=True works on v1.10.12 for PythonVirtualenvOperator Clean up code and simplified the solution to ignore just the unserializable items adding pytest for PythonVirtualenvOperator
This change will ensure that argument provide_context=True works on v1.10.10 for PythonVirtualenvOperator
This permits the user use the configuration that is being passed at runtime through the UI, CLI or REST API.
Related bug:
#8177
Make sure to mark the boxes below before creating PR: [x]
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.
Read the Pull Request Guidelines for more information.