diff --git a/src/workers.cc b/src/workers.cc index 1756baad..d221a5f6 100644 --- a/src/workers.cc +++ b/src/workers.cc @@ -199,11 +199,6 @@ ProducerConnect::ProducerConnect(Nan::Callback *callback, Producer* producer): ProducerConnect::~ProducerConnect() {} void ProducerConnect::Execute() { - // Activate the dispatchers before the connection, as some callbacks may run - // on the background thread. - // We will deactivate them if the connection fails. - producer->ActivateDispatchers(); - Baton b = producer->Connect(); if (b.err() != RdKafka::ERR_NO_ERROR) { @@ -222,6 +217,9 @@ void ProducerConnect::HandleOKCallback() { v8::Local argv[argc] = { Nan::Null(), obj}; + // Activate the dispatchers + producer->ActivateDispatchers(); + callback->Call(argc, argv); } @@ -558,11 +556,6 @@ KafkaConsumerConnect::KafkaConsumerConnect(Nan::Callback *callback, KafkaConsumerConnect::~KafkaConsumerConnect() {} void KafkaConsumerConnect::Execute() { - // Activate the dispatchers before the connection, as some callbacks may run - // on the background thread. - // We will deactivate them if the connection fails. - consumer->ActivateDispatchers(); - Baton b = consumer->Connect(); // consumer->Wait(); @@ -582,6 +575,7 @@ void KafkaConsumerConnect::HandleOKCallback() { Nan::New(consumer->Name()).ToLocalChecked()); v8::Local argv[argc] = { Nan::Null(), obj }; + consumer->ActivateDispatchers(); callback->Call(argc, argv); }