Open
Description
Checklist
- I have verified that the issue exists against the
main
branch of Celery. - This has already been asked to the discussions forum first.
- I have read the relevant section in the
contribution guide
on reporting bugs. - I have checked the issues list
for similar or identical bug reports. - I have checked the pull requests list
for existing proposed fixes. - I have checked the commit log
to find out if the bug was already fixed in the main branch. - I have included all related issues and possible duplicate issues
in this issue (If there are none, check this box anyway). - I have tried to reproduce the issue with pytest-celery and added the reproduction script below.
Mandatory Debugging Information
- I have included the output of
celery -A proj report
in the issue.
(if you are not able to do this, then at least specify the Celery
version affected). - I have verified that the issue exists against the
main
branch of Celery. - I have included the contents of
pip freeze
in the issue. - I have included all the versions of all the external dependencies required
to reproduce this bug.
Optional Debugging Information
- I have tried reproducing the issue on more than one Python version
and/or implementation. - I have tried reproducing the issue on more than one message broker and/or
result backend. - I have tried reproducing the issue on more than one version of the message
broker and/or result backend. - I have tried reproducing the issue on more than one operating system.
- I have tried reproducing the issue on more than one workers pool.
- I have tried reproducing the issue with autoscaling, retries,
ETA/Countdown & rate limits disabled. - I have tried reproducing the issue after downgrading
and/or upgrading Celery and its dependencies.
Related Issues and Possible Duplicates
Related Issues
- None
Possible Duplicates
- None
Environment & Settings
Celery version:
celery report
Output:
Steps to Reproduce
Required Dependencies
- Minimal Python Version: N/A or Unknown
- Minimal Celery Version: N/A or Unknown
- Minimal Kombu Version: N/A or Unknown
- Minimal Broker Version: N/A or Unknown
- Minimal Result Backend Version: N/A or Unknown
- Minimal OS and/or Kernel Version: N/A or Unknown
- Minimal Broker Client Version: N/A or Unknown
- Minimal Result Backend Client Version: N/A or Unknown
Python Packages
pip freeze
Output:
Other Dependencies
N/A
Minimally Reproducible Test Case
Expected Behavior
Actual Behavior
tasks.py:
def add(a, b):
return a + b
@shared_task
def agent_response(agent_id, channel, thread_ts, bot_token, query, user_id=None):
try:
logger.info("Creating agent")
agent = Agent.objects.get(id=agent_id)
agent_executor = get_agent(agent.integration, agent.credentials)
response = agent_executor.invoke({"input": query})
logger.info(response)
output_text = response[
"output"
] # Adjust according to your actual response structure
if user_id:
output_text = f"<@{user_id}> {output_text}"
if agent.integration == ThirdParty.SLACK:
# Update the initial message with the response and use mrkdown block section to return the response in Slack markdown format
# client.chat_update(
client = WebClient(token=bot_token)
client.chat_postMessage(
channel=channel,
ts=thread_ts,
text=output_text,
blocks=[
{"type": "section", "text": {"type": "mrkdwn", "text": output_text}}
],
)
logger.info("Message send successfully")
return response
except Exception as e:
raise e
Views.py:
"""Token Lookup"""
permission_classes = [permissions.AllowAny]
def post(self, request: HttpRequest, **kwargs):
# Verify incoming requests from Slack
# https://api.slack.com/authentication/verifying-requests-from-slack
if not signature_verifier.is_valid(
body=request.body,
timestamp=request.headers.get("X-Slack-Request-Timestamp"),
signature=request.headers.get("X-Slack-Signature"),
):
return Response("invalid request", status=status.HTTP_400_BAD_REQUEST)
data = request.data
data_type = data.get("type")
allowed_data_types = [
"url_verification",
"event_callback"
]
if data_type not in allowed_data_types:
return Response("Not Allowed", status=status.HTTP_400_BAD_REQUEST)
if data_type == "url_verification":
challenge = data.get("challenge")
return Response({"challenge": challenge}, status=status.HTTP_200_OK)
elif data_type == "event_callback":
event = data.get("event") or {}
# print(event)
# in the case where this app gets a request from an Enterprise Grid workspace
# enterprise_id = data.get("enterprise_id")
# The workspace's ID
team_id = data.get("team_id")
user_id = event.get("user")
query = event.get("text")
channel = event.get("channel")
thread_ts = event.get("ts")
# Lookup the stored bot token for this workspace
try:
bot = Bot.objects.get(
user_id=user_id,
team_id=team_id,
)
except:
return Response(status=status.HTTP_200_OK)
agent = bot.agent
bot_token = bot.access_token
if not bot_token:
# The app may be uninstalled or be used in a shared channel
return Response(
"Please install this app first!", status=status.HTTP_200_OK
)
add.delay(1,2)
client = WebClient(token=bot_token)
bot_id = client.api_call("auth.test")["user_id"]
if event.get("type") == "app_mention":
blocks = event.get("blocks") or []
elements = blocks[0].get("elements") or []
user_id = elements[0].get("user_id")
query = elements[1].get("text")
else:
user_id = event.get("user")
query = event.get("text")
# Ignore bot's own message
if user_id == bot_id:
return Response(status=status.HTTP_200_OK)
# Post an initial message
# result = client.chat_postMessage(
# channel=channel, text=":mag: Searching...", thread_ts=thread_ts
# )
# thread_ts = result.get("ts")
print("About to run task")
print(f"""agent_id: {agent.id}
channel: {channel}
bot_token: {bot_token}
query: {query}
thread_ts: {thread_ts}""")
# send_agent_response.delay(agent.id, channel, thread_ts, bot_token, query)
agent_response.apply_async(kwargs={
"agent_id": agent.id,
"channel": channel,
"thread_ts": thread_ts,
"bot_token": bot_token,
"query": query,
"user_id": user_id,
})
return Response(status=status.HTTP_200_OK)
return Response({"message": "No event"}, status=status.HTTP_200_OK)
Response:
[2024-05-14 18:35:56,879: INFO/MainProcess] Task integrations.tasks.agent_response[fd345480-2440-4d37-b9a1-4d5a77195e4c] received
[2024-05-14 18:35:56,913: ERROR/MainProcess] Task handler raised error: ValueError('not enough values to unpack (expected 3, got 0)')
billiard.einfo.RemoteTraceback:
"""
Traceback (most recent call last):
File "C:\Users\USER\anaconda3\envs\boilerplate\Lib\site-packages\billiard\pool.py", line 361, in workloop
tasks, accept, hostname = _loc
^^^^^^^^^^^^^^^^^^^^^^^
ValueError: not enough values to unpack (expected 3, got 0)
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "C:\Users\USER\anaconda3\envs\boilerplate\Lib\site-packages\billiard\pool.py", line 361, in workloop
result = (True, prepare_result(fun(*args, **kwargs)))
^^^^^^^^^^^^^^^^^^^^
File "C:\Users\USER\anaconda3\envs\boilerplate\Lib\site-packages\celery\app\trace.py", line 664, in fast_trace_task
tasks, accept, hostname = _loc
^^^^^^^^^^^^^^^^^^^^^^^
ValueError: not enough values to unpack (expected 3, got 0)