-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow version
3.1.3
If "Other Airflow 2/3 version" selected, which one?
No response
What happened?
Summary
I am trying to setup a DAG that triggers when messages are dropped into a Valkey queue.
I expect the following to happen.
- I write a message to the Valkey Queue with some JSON in it.
- The Airflow MessageQueueTrigger sees the message in Valkey, and it triggers the DAG.
The following appears to be happening.
- I write a message to the Valkey Queue with some JSON in it.
- Airflow reads the message as binary. It attempts to encode the binary to JSON so that it can write an
asset_eventto the Airflow database. - An exception is thrown because binary cannot be encoded to JSON.
The following are the versions.
- Airflow: 3.1.3
- Running as standalone in the official docker container.
- The following pips are installed.
apache-airflow-providers-redisapache-airflow-providers-common-messagingredis
- Valkey: 9.0
- Running in a separate container.
Extra Information
The part of the Exception that makes me suspect it's an encoding thing is shown below. It appears to be reading the channel and the data as binary, and the exception is saying bytes aren't JSON serializable.
airflow-server | triggerer | sqlalchemy.exc.StatementError: (builtins.TypeError) Object of type bytes is not JSON serializable
airflow-server | triggerer | [SQL: INSERT INTO asset_event (asset_id, extra, source_task_id, source_dag_id, source_run_id, timestamp) VALUES (?, ?, ?, ?, ?, ?)]
airflow-server | triggerer | [parameters: [{'extra': {'from_trigger': True, 'payload': {'type': 'message', 'pattern': None, 'channel': b'test_airflow_queue', 'data': b'{"hello":"world"}'}}, 'asset_id': 1, 'source_run_id': None, 'source_task_id': None, 'source_dag_id': None}]]
The full exception block is the following.
airflow-server | triggerer | 2025-12-09T16:51:35.143456Z [error ] Exception when executing TriggerRunnerSupervisor.run [airflow.jobs.triggerer_job_runner.TriggererJobRunner] loc=triggerer_job_runner.py:173
airflow-server | triggerer | Traceback (most recent call last):
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1810, in _execute_context
airflow-server | triggerer | context = constructor(
airflow-server | triggerer | ^^^^^^^^^^^^
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/default.py", line 1078, in _init_compiled
airflow-server | triggerer | processors[key](compiled_params[key])
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/type_api.py", line 1668, in process
airflow-server | triggerer | return impl_processor(process_param(value, dialect))
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/sqltypes.py", line 2669, in process
airflow-server | triggerer | serialized = json_serializer(value)
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File "/usr/python/lib/python3.12/json/__init__.py", line 231, in dumps
airflow-server | triggerer | return _default_encoder.encode(obj)
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File "/usr/python/lib/python3.12/json/encoder.py", line 200, in encode
airflow-server | triggerer | chunks = self.iterencode(o, _one_shot=True)
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File "/usr/python/lib/python3.12/json/encoder.py", line 258, in iterencode
airflow-server | triggerer | return _iterencode(o, 0)
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File "/usr/python/lib/python3.12/json/encoder.py", line 180, in default
airflow-server | triggerer | raise TypeError(f'Object of type {o.__class__.__name__} '
airflow-server | triggerer | TypeError: Object of type bytes is not JSON serializable
airflow-server | triggerer | The above exception was the direct cause of the following exception:
airflow-server | triggerer | Traceback (most recent call last):
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 170, in _execute
airflow-server | triggerer | self.trigger_runner.run()
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 531, in run
airflow-server | triggerer | self.handle_events()
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/traces/tracer.py", line 58, in wrapper
airflow-server | triggerer | return func(*args, **kwargs)
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 558, in handle_events
airflow-server | triggerer | Trigger.submit_event(trigger_id=trigger_id, event=event)
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py", line 100, in wrapper
airflow-server | triggerer | return func(*args, session=session, **kwargs)
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/trigger.py", line 260, in submit_event
airflow-server | triggerer | AssetManager.register_asset_change(
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/assets/manager.py", line 161, in register_asset_change
airflow-server | triggerer | session.flush() # Ensure the event is written earlier than DDRQ entries below.
airflow-server | triggerer | ^^^^^^^^^^^^^^^
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3449, in flush
airflow-server | triggerer | self._flush(objects)
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3588, in _flush
airflow-server | triggerer | with util.safe_reraise():
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
airflow-server | triggerer | compat.raise_(
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
airflow-server | triggerer | raise exception
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3549, in _flush
airflow-server | triggerer | flush_context.execute()
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute
airflow-server | triggerer | rec.execute(self)
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute
airflow-server | triggerer | util.preloaded.orm_persistence.save_obj(
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py", line 245, in save_obj
airflow-server | triggerer | _emit_insert_statements(
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py", line 1238, in _emit_insert_statements
airflow-server | triggerer | result = connection._execute_20(
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1710, in _execute_20
airflow-server | triggerer | return meth(self, args_10style, kwargs_10style, execution_options)
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection
airflow-server | triggerer | return connection._execute_clauseelement(
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1577, in _execute_clauseelement
airflow-server | triggerer | ret = self._execute_context(
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1816, in _execute_context
airflow-server | triggerer | self._handle_dbapi_exception(
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 2134, in _handle_dbapi_exception
airflow-server | triggerer | util.raise_(
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
airflow-server | triggerer | raise exception
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1810, in _execute_context
airflow-server | triggerer | context = constructor(
airflow-server | triggerer | ^^^^^^^^^^^^
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/default.py", line 1078, in _init_compiled
airflow-server | triggerer | processors[key](compiled_params[key])
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/type_api.py", line 1668, in process
airflow-server | triggerer | return impl_processor(process_param(value, dialect))
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/sqltypes.py", line 2669, in process
airflow-server | triggerer | serialized = json_serializer(value)
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File "/usr/python/lib/python3.12/json/__init__.py", line 231, in dumps
airflow-server | triggerer | return _default_encoder.encode(obj)
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File "/usr/python/lib/python3.12/json/encoder.py", line 200, in encode
airflow-server | triggerer | chunks = self.iterencode(o, _one_shot=True)
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File "/usr/python/lib/python3.12/json/encoder.py", line 258, in iterencode
airflow-server | triggerer | return _iterencode(o, 0)
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File "/usr/python/lib/python3.12/json/encoder.py", line 180, in default
airflow-server | triggerer | raise TypeError(f'Object of type {o.__class__.__name__} '
airflow-server | triggerer | sqlalchemy.exc.StatementError: (builtins.TypeError) Object of type bytes is not JSON serializable
airflow-server | triggerer | [SQL: INSERT INTO asset_event (asset_id, extra, source_task_id, source_dag_id, source_run_id, timestamp) VALUES (?, ?, ?, ?, ?, ?)]
airflow-server | triggerer | [parameters: [{'extra': {'from_trigger': True, 'payload': {'type': 'message', 'pattern': None, 'channel': b'test_airflow_queue', 'data': b'{"hello":"world"}'}}, 'asset_id': 1, 'source_run_id': None, 'source_task_id': None, 'source_dag_id': None}]]
airflow-server | triggerer | 2025-12-09T16:51:35.148429Z [info ] Waiting for triggers to clean up [airflow.jobs.triggerer_job_runner.TriggererJobRunner] loc=triggerer_job_runner.py:176
airflow-server | triggerer | 2025-12-09T16:51:35.151731Z [info ] Process exited [supervisor] exit_code=-2 loc=supervisor.py:709 pid=37 signal_sent=SIGINT
airflow-server | triggerer | 2025-12-09T16:51:35.151835Z [info ] Exited trigger loop [airflow.jobs.triggerer_job_runner.TriggererJobRunner] loc=triggerer_job_runner.py:181
airflow-server | triggerer | Traceback (most recent call last):
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1810, in _execute_context
airflow-server | triggerer | context = constructor(
airflow-server | triggerer | ^^^^^^^^^^^^
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/default.py", line 1078, in _init_compiled
airflow-server | triggerer | processors[key](compiled_params[key])
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/type_api.py", line 1668, in process
airflow-server | triggerer | return impl_processor(process_param(value, dialect))
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/sqltypes.py", line 2669, in process
airflow-server | triggerer | serialized = json_serializer(value)
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File "/usr/python/lib/python3.12/json/__init__.py", line 231, in dumps
airflow-server | triggerer | return _default_encoder.encode(obj)
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File "/usr/python/lib/python3.12/json/encoder.py", line 200, in encode
airflow-server | triggerer | chunks = self.iterencode(o, _one_shot=True)
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File "/usr/python/lib/python3.12/json/encoder.py", line 258, in iterencode
airflow-server | triggerer | return _iterencode(o, 0)
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File "/usr/python/lib/python3.12/json/encoder.py", line 180, in default
airflow-server | triggerer | raise TypeError(f'Object of type {o.__class__.__name__} '
airflow-server | triggerer | TypeError: Object of type bytes is not JSON serializable
airflow-server | triggerer | The above exception was the direct cause of the following exception:
airflow-server | triggerer | Traceback (most recent call last):
airflow-server | triggerer | File "/home/airflow/.local/bin/airflow", line 7, in <module>
airflow-server | triggerer | sys.exit(main())
airflow-server | triggerer | ^^^^^^
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/__main__.py", line 55, in main
airflow-server | triggerer | args.func(args)
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/cli_config.py", line 49, in command
airflow-server | triggerer | return func(*args, **kwargs)
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/cli.py", line 114, in wrapper
airflow-server | triggerer | return f(*args, **kwargs)
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/providers_configuration_loader.py", line 54, in wrapped_function
airflow-server | triggerer | return func(*args, **kwargs)
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/triggerer_command.py", line 69, in triggerer
airflow-server | triggerer | run_command_with_daemon_option(
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/daemon_utils.py", line 86, in run_command_with_daemon_option
airflow-server | triggerer | callback()
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/triggerer_command.py", line 72, in <lambda>
airflow-server | triggerer | callback=lambda: triggerer_run(args.skip_serve_logs, args.capacity, triggerer_heartrate),
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/triggerer_command.py", line 55, in triggerer_run
airflow-server | triggerer | run_job(job=triggerer_job_runner.job, execute_callable=triggerer_job_runner._execute)
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py", line 100, in wrapper
airflow-server | triggerer | return func(*args, session=session, **kwargs)
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/job.py", line 368, in run_job
airflow-server | triggerer | return execute_job(job, execute_callable=execute_callable)
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/job.py", line 397, in execute_job
airflow-server | triggerer | ret = execute_callable()
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 170, in _execute
airflow-server | triggerer | self.trigger_runner.run()
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 531, in run
airflow-server | triggerer | self.handle_events()
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/traces/tracer.py", line 58, in wrapper
airflow-server | triggerer | return func(*args, **kwargs)
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 558, in handle_events
airflow-server | triggerer | Trigger.submit_event(trigger_id=trigger_id, event=event)
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py", line 100, in wrapper
airflow-server | triggerer | return func(*args, session=session, **kwargs)
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/trigger.py", line 260, in submit_event
airflow-server | triggerer | AssetManager.register_asset_change(
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/airflow/assets/manager.py", line 161, in register_asset_change
airflow-server | triggerer | session.flush() # Ensure the event is written earlier than DDRQ entries below.
airflow-server | triggerer | ^^^^^^^^^^^^^^^
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3449, in flush
airflow-server | triggerer | self._flush(objects)
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3588, in _flush
airflow-server | triggerer | with util.safe_reraise():
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
airflow-server | triggerer | compat.raise_(
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
airflow-server | triggerer | raise exception
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3549, in _flush
airflow-server | triggerer | flush_context.execute()
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute
airflow-server | triggerer | rec.execute(self)
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute
airflow-server | triggerer | util.preloaded.orm_persistence.save_obj(
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py", line 245, in save_obj
airflow-server | triggerer | _emit_insert_statements(
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py", line 1238, in _emit_insert_statements
airflow-server | triggerer | result = connection._execute_20(
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1710, in _execute_20
airflow-server | triggerer | return meth(self, args_10style, kwargs_10style, execution_options)
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection
airflow-server | triggerer | return connection._execute_clauseelement(
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1577, in _execute_clauseelement
airflow-server | triggerer | ret = self._execute_context(
airflow-server | triggerer | ^^^^^^^^^^^^^^^^^^^^^^
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1816, in _execute_context
airflow-server | triggerer | self._handle_dbapi_exception(
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 2134, in _handle_dbapi_exception
airflow-server | triggerer | util.raise_(
airflow-server | triggerer | File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
airflow-server | triggerer | raise exception
The following is my DAG file. I expect it to trigger when Redis messages are added and then do an EmptyOperator.
from __future__ import annotations
import pendulum
from airflow.decorators import task
from airflow.providers.common.messaging.triggers.msg_queue import MessageQueueTrigger
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sdk import DAG, Asset, AssetWatcher, chain
# 1. Define the Trigger
# Set the scheme to 'redis+pubsub' and specify the channel to listen on.
REDIS_CHANNEL = "test_airflow_queue"
redis_trigger = MessageQueueTrigger(
scheme="redis+pubsub",
channels=[REDIS_CHANNEL],
redis_conn_id="redis_shared", # Your Airflow Connection ID
)
# 2. Define the Asset
# The Asset represents the external event source (the Redis queue/channel).
redis_asset = Asset(
name="redis_queue_asset",
watchers=[
AssetWatcher(
name=f"redis_watcher_{REDIS_CHANNEL}",
trigger=redis_trigger,
)
],
)
with DAG(
dag_id="redis_event_driven_dag",
# Set schedule to the redis_asset
schedule=[redis_asset],
) as dag:
# Do nothing for now
EmptyOperator(task_id="dag_did_something")
I am writing the following JSON to the Redis Queue, and I'm encoding it as UTF-8 before I write it.
{"hello":"world"}
I attempted to setup a Redis Connection in the Airflow UI that turns on decoding by adding the following to the Extra Field JSON.
{
"decode_responses": true
}
What you think should happen instead?
Sorry in advance if I'm missing something something that already exists.
I thought one or more of the following should have happened.
- The DAG is triggered and it gets the contents of the queue-message.
As for how it does that, I'm unsure. Some ideas came to mind. And sorry if these already exist.
- Make an option to force Redis connections to always decode.
- Add a callback function called
decode_data_function(or something) to the MessageQueueTrigger (or some related class) that I can implement. I could implement a function that decodes the binary data into something that can be encoded to JSON.
How to reproduce
Steps to reproduce
- Launch the Standalone Airflow in Docker, along with a Redis container.
- Setup a Redis Connection in the Airflow Website
- Set host and port to the host and port of Redis
- I set Extra Fields JSON to the following
- {"decode_responses": true}
- Note, I found decode_responses using AI but I suspect it's hallucinating.
- Create the following DAG
from __future__ import annotations
import pendulum
from airflow.decorators import task
from airflow.providers.common.messaging.triggers.msg_queue import MessageQueueTrigger
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sdk import DAG, Asset, AssetWatcher, chain
# 1. Define the Trigger
# Set the scheme to 'redis+pubsub' and specify the channel to listen on.
REDIS_CHANNEL = "test_airflow_queue"
redis_trigger = MessageQueueTrigger(
scheme="redis+pubsub",
channels=[REDIS_CHANNEL],
redis_conn_id="redis_shared", # Your Airflow Connection ID
)
# 2. Define the Asset
# The Asset represents the external event source (the Redis queue/channel).
redis_asset = Asset(
name="redis_queue_asset",
watchers=[
AssetWatcher(
name=f"redis_watcher_{REDIS_CHANNEL}",
trigger=redis_trigger,
)
],
)
with DAG(
dag_id="redis_event_driven_dag",
# Set schedule to the redis_asset
schedule=[redis_asset],
) as dag:
# Do nothing for now
EmptyOperator(task_id="dag_did_something")
- Turn the DAG on in the Airflow Website
- Drop a message into the Redis Queue
Operating System
Debian GNU/Linux 12 (bookworm)
Versions of Apache Airflow Providers
airflow@airflow-server:/opt/airflow$ pip freeze | grep apache-airflow-providers
apache-airflow-providers-amazon==9.16.0
apache-airflow-providers-celery==3.13.0
apache-airflow-providers-cncf-kubernetes==10.9.0
apache-airflow-providers-common-compat==1.8.0
apache-airflow-providers-common-io==1.6.4
apache-airflow-providers-common-messaging==2.0.0
apache-airflow-providers-common-sql==1.28.2
apache-airflow-providers-docker==4.4.4
apache-airflow-providers-elasticsearch==6.3.4
apache-airflow-providers-fab==3.0.1
apache-airflow-providers-ftp==3.13.2
apache-airflow-providers-git==0.0.9
apache-airflow-providers-google==18.1.0
apache-airflow-providers-grpc==3.8.2
apache-airflow-providers-hashicorp==4.3.3
apache-airflow-providers-http==5.4.0
apache-airflow-providers-microsoft-azure==12.8.0
apache-airflow-providers-mysql==6.3.4
apache-airflow-providers-odbc==4.10.2
apache-airflow-providers-openlineage==2.7.3
apache-airflow-providers-postgres==6.4.0
apache-airflow-providers-redis==4.3.2
apache-airflow-providers-sendgrid==4.1.4
apache-airflow-providers-sftp==5.4.1
apache-airflow-providers-slack==9.4.0
apache-airflow-providers-smtp==2.3.1
apache-airflow-providers-snowflake==6.6.0
apache-airflow-providers-ssh==4.1.5
apache-airflow-providers-standard==1.9.1
Deployment
Docker-Compose
Deployment details
The following are the versions.
- Airflow: 3.1.3
- Running as standalone in the official docker container.
- The following pips are installed.
apache-airflow-providers-redisapache-airflow-providers-common-messagingredis
- Valkey: 9.0
- Running in a separate container.
Anything else?
The problem always occurs, even when I drop an empty string into the Redis queue.
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct