Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Events added to in-memory queue are not being removed after wait_for_external_event consumes it #481

Open
taoweng1000 opened this issue Jan 26, 2024 · 4 comments
Labels
P2 Priority 2 question

Comments

@taoweng1000
Copy link

taoweng1000 commented Jan 26, 2024

Hi,

I have a simple program where events are sent to an orchestrator using raise_event, the events on the message queues are never getting removed after wait_for_external_event is called . Any reason why?


async def getCurrentOrchestrationInstance(orchestrator_name: str, instance_id: str, client: df.DurableOrchestrationClient):
existing_instance = await client.get_status(instance_id)

if existing_instance.runtime_status in [df.OrchestrationRuntimeStatus.Completed,
                                        df.OrchestrationRuntimeStatus.Failed, 
                                        df.OrchestrationRuntimeStatus.Terminated, 
                                        None]:
    instance_id = await client.start_new(orchestrator_name, instance_id)
    logging.info(f"Started orchestration with ID = '{instance_id}'.")
    return instance_id
else:
    return instance_id

@app.function_name(name="DataProcessor")
@app.event_hub_message_trigger(arg_name="eventHubEvent", event_hub_name="test",
connection="EventHubConnectionStringTrigger")
@app.durable_client_input(client_name="client")
async def data_processor(eventHubEvent: func.EventHubEvent, client: df.DurableOrchestrationClient):
eventhub_event_body = eventHubEvent.get_body()
event=json.loads(eventhub_event_body)
instance_id = "HaulCycleOrchestrator"
instance_id = await getCurrentOrchestrationInstance("Orchestrator", instance_id, client)
await client.raise_event(instance_id, "get_input", event)

Orchestrator

@app.function_name(name="Orchestrator")
@app.orchestration_trigger(context_name="context")
def orchestrator(context: df.DurableOrchestrationContext):
cache=context.get_input()
while True:
wait_event = yield context.wait_for_external_event('get_input')
logging.info("wait_event received")

@bachuv
Copy link
Contributor

bachuv commented Jan 27, 2024

Hi @taoweng1000, I will try to reproduce this issue and get back to you soon. Thanks!

@taoweng1000
Copy link
Author

Hi @bachuv, thank you very much for your help. I really appreciate it.
I'm on Python 3.10, azure-functions 1.18.0, azure-functions-durable 1.2.8
Core Tools Version: 4.0.5455 Commit hash: N/A (64-bit)
Function Runtime Version: 4.27.5.21554
Please let me know if you need any additional information.

@taoweng1000
Copy link
Author

Hi @bachuv, I have a favor to ask, I need to demo the Azure durable function to my team by Thursday this week. I'm wondering if you can give an update before then on whether this is a bug or missing feature, or misconfiguration. Thank you very much for your help.

@lilyjma
Copy link

lilyjma commented Mar 4, 2024

Hi @taoweng1000 - thank you for using Durable Functions! I'm a PM working on DF and would love to learn about your experience using the product. You can share your feedback in this quick survey to help influence what the team works on next. If you're building intelligent apps, there's also an opportunity to participate in a compensated UX study. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
P2 Priority 2 question
Projects
None yet
Development

No branches or pull requests

3 participants