-
Notifications
You must be signed in to change notification settings - Fork 40
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Create a Custom pandas compatible Dataframe class #1319
Conversation
16d7142
to
731c51d
Compare
Codecov ReportBase: 95.05% // Head: 95.71% // Increases project coverage by
Additional details and impacted files@@ Coverage Diff @@
## main #1319 +/- ##
==========================================
+ Coverage 95.05% 95.71% +0.65%
==========================================
Files 72 19 -53
Lines 3400 677 -2723
Branches 391 68 -323
==========================================
- Hits 3232 648 -2584
+ Misses 102 18 -84
+ Partials 66 11 -55 Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. ☔ View full report at Codecov. |
2a01f45
to
ce28b8d
Compare
ce28b8d
to
45f973e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great implementation ! You have covered a lot of ground in this PR 👍
45f973e
to
db441a4
Compare
Create a pandas-compatible Dataframe class that has `serialize` and `deserialize` methods needed by Airflow 2.5's BaseXCom class
db441a4
to
4e25c7e
Compare
There was a bug in #1319 where I was converting Dataframe to str, but Airflow's XCom expects it to be a dict. This meant that when I tried to check Airflow's Xcom page, I saw the following error in Webserver logs: ``` dev-airflow-webserver-1 | [2022-11-30 15:22:34,709] {app.py:1741} ERROR - Exception on /xcom/list/ [GET] dev-airflow-webserver-1 | Traceback (most recent call last): dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/flask/app.py", line 2525, in wsgi_app dev-airflow-webserver-1 | response = self.full_dispatch_request() dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/flask/app.py", line 1822, in full_dispatch_request dev-airflow-webserver-1 | rv = self.handle_user_exception(e) dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/flask/app.py", line 1820, in full_dispatch_request dev-airflow-webserver-1 | rv = self.dispatch_request() dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/flask/app.py", line 1796, in dispatch_request dev-airflow-webserver-1 | return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args) dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/flask_appbuilder/security/decorators.py", line 133, in wraps dev-airflow-webserver-1 | return f(self, *args, **kwargs) dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/flask_appbuilder/views.py", line 554, in list dev-airflow-webserver-1 | widgets = self._list() dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/flask_appbuilder/baseviews.py", line 1164, in _list dev-airflow-webserver-1 | widgets = self._get_list_widget( dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/flask_appbuilder/baseviews.py", line 1063, in _get_list_widget dev-airflow-webserver-1 | count, lst = self.datamodel.query( dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/flask_appbuilder/models/sqla/interface.py", line 471, in query dev-airflow-webserver-1 | query_results = query.all() dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/query.py", line 2759, in all dev-airflow-webserver-1 | return self._iter().all() dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/result.py", line 1361, in all dev-airflow-webserver-1 | return self._allrows() dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/result.py", line 400, in _allrows dev-airflow-webserver-1 | rows = self._fetchall_impl() dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/result.py", line 1274, in _fetchall_impl dev-airflow-webserver-1 | return self._real_result._fetchall_impl() dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/result.py", line 1686, in _fetchall_impl dev-airflow-webserver-1 | return list(self.iterator) dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/loading.py", line 151, in chunks dev-airflow-webserver-1 | rows = [proc(row) for row in fetch] dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/loading.py", line 151, in <listcomp> dev-airflow-webserver-1 | rows = [proc(row) for row in fetch] dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/loading.py", line 984, in _instance dev-airflow-webserver-1 | state.manager.dispatch.load(state, context) dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/sqlalchemy/event/attr.py", line 343, in __call__ dev-airflow-webserver-1 | fn(*args, **kw) dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/mapper.py", line 3600, in _event_on_load dev-airflow-webserver-1 | instrumenting_mapper._reconstructor(state.obj()) dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/airflow/models/xcom.py", line 125, in init_on_load dev-airflow-webserver-1 | self.value = self.orm_deserialize_value() dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/airflow/models/xcom.py", line 672, in orm_deserialize_value dev-airflow-webserver-1 | return BaseXCom._deserialize_value(self, True) dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/airflow/models/xcom.py", line 654, in _deserialize_value dev-airflow-webserver-1 | return json.loads(result.value.decode("UTF-8"), cls=XComDecoder, object_hook=object_hook) dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/json/__init__.py", line 359, in loads dev-airflow-webserver-1 | return cls(**kw).decode(s) dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/json/decoder.py", line 337, in decode dev-airflow-webserver-1 | obj, end = self.raw_decode(s, idx=_w(s, 0).end()) dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/json/decoder.py", line 353, in raw_decode dev-airflow-webserver-1 | obj, end = self.scan_once(s, idx) dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/airflow/utils/json.py", line 255, in orm_object_hook dev-airflow-webserver-1 | for k, v in data.items(): dev-airflow-webserver-1 | AttributeError: 'str' object has no attribute 'items' ```
There was a bug in #1319 where I was converting Dataframe to str, but Airflow's XCom expects it to be a dict. This meant that when I tried to check Airflow's Xcom page, I saw the following error in Webserver logs: ``` dev-airflow-webserver-1 | [2022-11-30 15:22:34,709] {app.py:1741} ERROR - Exception on /xcom/list/ [GET] dev-airflow-webserver-1 | Traceback (most recent call last): dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/flask/app.py", line 2525, in wsgi_app dev-airflow-webserver-1 | response = self.full_dispatch_request() dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/flask/app.py", line 1822, in full_dispatch_request dev-airflow-webserver-1 | rv = self.handle_user_exception(e) dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/flask/app.py", line 1820, in full_dispatch_request dev-airflow-webserver-1 | rv = self.dispatch_request() dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/flask/app.py", line 1796, in dispatch_request dev-airflow-webserver-1 | return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args) dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/flask_appbuilder/security/decorators.py", line 133, in wraps dev-airflow-webserver-1 | return f(self, *args, **kwargs) dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/flask_appbuilder/views.py", line 554, in list dev-airflow-webserver-1 | widgets = self._list() dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/flask_appbuilder/baseviews.py", line 1164, in _list dev-airflow-webserver-1 | widgets = self._get_list_widget( dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/flask_appbuilder/baseviews.py", line 1063, in _get_list_widget dev-airflow-webserver-1 | count, lst = self.datamodel.query( dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/flask_appbuilder/models/sqla/interface.py", line 471, in query dev-airflow-webserver-1 | query_results = query.all() dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/query.py", line 2759, in all dev-airflow-webserver-1 | return self._iter().all() dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/result.py", line 1361, in all dev-airflow-webserver-1 | return self._allrows() dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/result.py", line 400, in _allrows dev-airflow-webserver-1 | rows = self._fetchall_impl() dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/result.py", line 1274, in _fetchall_impl dev-airflow-webserver-1 | return self._real_result._fetchall_impl() dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/result.py", line 1686, in _fetchall_impl dev-airflow-webserver-1 | return list(self.iterator) dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/loading.py", line 151, in chunks dev-airflow-webserver-1 | rows = [proc(row) for row in fetch] dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/loading.py", line 151, in <listcomp> dev-airflow-webserver-1 | rows = [proc(row) for row in fetch] dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/loading.py", line 984, in _instance dev-airflow-webserver-1 | state.manager.dispatch.load(state, context) dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/sqlalchemy/event/attr.py", line 343, in __call__ dev-airflow-webserver-1 | fn(*args, **kw) dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/mapper.py", line 3600, in _event_on_load dev-airflow-webserver-1 | instrumenting_mapper._reconstructor(state.obj()) dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/airflow/models/xcom.py", line 125, in init_on_load dev-airflow-webserver-1 | self.value = self.orm_deserialize_value() dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/airflow/models/xcom.py", line 672, in orm_deserialize_value dev-airflow-webserver-1 | return BaseXCom._deserialize_value(self, True) dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/airflow/models/xcom.py", line 654, in _deserialize_value dev-airflow-webserver-1 | return json.loads(result.value.decode("UTF-8"), cls=XComDecoder, object_hook=object_hook) dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/json/__init__.py", line 359, in loads dev-airflow-webserver-1 | return cls(**kw).decode(s) dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/json/decoder.py", line 337, in decode dev-airflow-webserver-1 | obj, end = self.raw_decode(s, idx=_w(s, 0).end()) dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/json/decoder.py", line 353, in raw_decode dev-airflow-webserver-1 | obj, end = self.scan_once(s, idx) dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/airflow/utils/json.py", line 255, in orm_object_hook dev-airflow-webserver-1 | for k, v in data.items(): dev-airflow-webserver-1 | AttributeError: 'str' object has no attribute 'items' ```
There was a bug in #1319 where I was converting Dataframe to str, but Airflow's XCom expects it to be a dict as described in http://apache-airflow-docs.s3-website.eu-central-1.amazonaws.com/docs/apache-airflow/latest/concepts/taskflow.html#passing-arbitrary-objects-as-arguments This meant that when I tried to check Airflow's Xcom page, I saw the following error in Webserver logs: ``` dev-airflow-webserver-1 | [2022-11-30 15:22:34,709] {app.py:1741} ERROR - Exception on /xcom/list/ [GET] dev-airflow-webserver-1 | Traceback (most recent call last): dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/flask/app.py", line 2525, in wsgi_app dev-airflow-webserver-1 | response = self.full_dispatch_request() dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/flask/app.py", line 1822, in full_dispatch_request dev-airflow-webserver-1 | rv = self.handle_user_exception(e) dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/flask/app.py", line 1820, in full_dispatch_request dev-airflow-webserver-1 | rv = self.dispatch_request() dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/flask/app.py", line 1796, in dispatch_request dev-airflow-webserver-1 | return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args) dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/flask_appbuilder/security/decorators.py", line 133, in wraps dev-airflow-webserver-1 | return f(self, *args, **kwargs) dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/flask_appbuilder/views.py", line 554, in list dev-airflow-webserver-1 | widgets = self._list() dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/flask_appbuilder/baseviews.py", line 1164, in _list dev-airflow-webserver-1 | widgets = self._get_list_widget( dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/flask_appbuilder/baseviews.py", line 1063, in _get_list_widget dev-airflow-webserver-1 | count, lst = self.datamodel.query( dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/flask_appbuilder/models/sqla/interface.py", line 471, in query dev-airflow-webserver-1 | query_results = query.all() dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/query.py", line 2759, in all dev-airflow-webserver-1 | return self._iter().all() dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/result.py", line 1361, in all dev-airflow-webserver-1 | return self._allrows() dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/result.py", line 400, in _allrows dev-airflow-webserver-1 | rows = self._fetchall_impl() dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/result.py", line 1274, in _fetchall_impl dev-airflow-webserver-1 | return self._real_result._fetchall_impl() dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/result.py", line 1686, in _fetchall_impl dev-airflow-webserver-1 | return list(self.iterator) dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/loading.py", line 151, in chunks dev-airflow-webserver-1 | rows = [proc(row) for row in fetch] dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/loading.py", line 151, in <listcomp> dev-airflow-webserver-1 | rows = [proc(row) for row in fetch] dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/loading.py", line 984, in _instance dev-airflow-webserver-1 | state.manager.dispatch.load(state, context) dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/sqlalchemy/event/attr.py", line 343, in __call__ dev-airflow-webserver-1 | fn(*args, **kw) dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/mapper.py", line 3600, in _event_on_load dev-airflow-webserver-1 | instrumenting_mapper._reconstructor(state.obj()) dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/airflow/models/xcom.py", line 125, in init_on_load dev-airflow-webserver-1 | self.value = self.orm_deserialize_value() dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/airflow/models/xcom.py", line 672, in orm_deserialize_value dev-airflow-webserver-1 | return BaseXCom._deserialize_value(self, True) dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/airflow/models/xcom.py", line 654, in _deserialize_value dev-airflow-webserver-1 | return json.loads(result.value.decode("UTF-8"), cls=XComDecoder, object_hook=object_hook) dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/json/__init__.py", line 359, in loads dev-airflow-webserver-1 | return cls(**kw).decode(s) dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/json/decoder.py", line 337, in decode dev-airflow-webserver-1 | obj, end = self.raw_decode(s, idx=_w(s, 0).end()) dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/json/decoder.py", line 353, in raw_decode dev-airflow-webserver-1 | obj, end = self.scan_once(s, idx) dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/airflow/utils/json.py", line 255, in orm_object_hook dev-airflow-webserver-1 | for k, v in data.items(): dev-airflow-webserver-1 | AttributeError: 'str' object has no attribute 'items' ```
This is an attempt to create a pandas-compatible Dataframe class that has `serialize` and `deserialize` methods needed by Airflow 2.5's BaseXCom class (cherry picked from commit a7308e1)
There was a bug in #1319 where I was converting Dataframe to str, but Airflow's XCom expects it to be a dict as described in http://apache-airflow-docs.s3-website.eu-central-1.amazonaws.com/docs/apache-airflow/latest/concepts/taskflow.html#passing-arbitrary-objects-as-arguments This meant that when I tried to check Airflow's Xcom page, I saw the following error in Webserver logs: ``` dev-airflow-webserver-1 | [2022-11-30 15:22:34,709] {app.py:1741} ERROR - Exception on /xcom/list/ [GET] dev-airflow-webserver-1 | Traceback (most recent call last): dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/flask/app.py", line 2525, in wsgi_app dev-airflow-webserver-1 | response = self.full_dispatch_request() dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/flask/app.py", line 1822, in full_dispatch_request dev-airflow-webserver-1 | rv = self.handle_user_exception(e) dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/flask/app.py", line 1820, in full_dispatch_request dev-airflow-webserver-1 | rv = self.dispatch_request() dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/flask/app.py", line 1796, in dispatch_request dev-airflow-webserver-1 | return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args) dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/flask_appbuilder/security/decorators.py", line 133, in wraps dev-airflow-webserver-1 | return f(self, *args, **kwargs) dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/flask_appbuilder/views.py", line 554, in list dev-airflow-webserver-1 | widgets = self._list() dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/flask_appbuilder/baseviews.py", line 1164, in _list dev-airflow-webserver-1 | widgets = self._get_list_widget( dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/flask_appbuilder/baseviews.py", line 1063, in _get_list_widget dev-airflow-webserver-1 | count, lst = self.datamodel.query( dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/flask_appbuilder/models/sqla/interface.py", line 471, in query dev-airflow-webserver-1 | query_results = query.all() dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/query.py", line 2759, in all dev-airflow-webserver-1 | return self._iter().all() dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/result.py", line 1361, in all dev-airflow-webserver-1 | return self._allrows() dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/result.py", line 400, in _allrows dev-airflow-webserver-1 | rows = self._fetchall_impl() dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/result.py", line 1274, in _fetchall_impl dev-airflow-webserver-1 | return self._real_result._fetchall_impl() dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/sqlalchemy/engine/result.py", line 1686, in _fetchall_impl dev-airflow-webserver-1 | return list(self.iterator) dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/loading.py", line 151, in chunks dev-airflow-webserver-1 | rows = [proc(row) for row in fetch] dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/loading.py", line 151, in <listcomp> dev-airflow-webserver-1 | rows = [proc(row) for row in fetch] dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/loading.py", line 984, in _instance dev-airflow-webserver-1 | state.manager.dispatch.load(state, context) dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/sqlalchemy/event/attr.py", line 343, in __call__ dev-airflow-webserver-1 | fn(*args, **kw) dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/mapper.py", line 3600, in _event_on_load dev-airflow-webserver-1 | instrumenting_mapper._reconstructor(state.obj()) dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/airflow/models/xcom.py", line 125, in init_on_load dev-airflow-webserver-1 | self.value = self.orm_deserialize_value() dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/airflow/models/xcom.py", line 672, in orm_deserialize_value dev-airflow-webserver-1 | return BaseXCom._deserialize_value(self, True) dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/airflow/models/xcom.py", line 654, in _deserialize_value dev-airflow-webserver-1 | return json.loads(result.value.decode("UTF-8"), cls=XComDecoder, object_hook=object_hook) dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/json/__init__.py", line 359, in loads dev-airflow-webserver-1 | return cls(**kw).decode(s) dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/json/decoder.py", line 337, in decode dev-airflow-webserver-1 | obj, end = self.raw_decode(s, idx=_w(s, 0).end()) dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/json/decoder.py", line 353, in raw_decode dev-airflow-webserver-1 | obj, end = self.scan_once(s, idx) dev-airflow-webserver-1 | File "/usr/local/lib/python3.9/site-packages/airflow/utils/json.py", line 255, in orm_object_hook dev-airflow-webserver-1 | for k, v in data.items(): dev-airflow-webserver-1 | AttributeError: 'str' object has no attribute 'items' ``` (cherry picked from commit 53d6bb7)
This is an attempt to create a pandas-compatible Dataframe class that has
serialize
anddeserialize
methods needed by Airflow 2.5's BaseXCom class