New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Class-based actor #9

Closed
rakanalh opened this Issue Nov 13, 2017 · 16 comments

Comments

3 participants
@rakanalh
Contributor

rakanalh commented Nov 13, 2017

Hello,

I've been looking for a celery alternative for quite a while, thanks for taking the time to make this.

A suggestion i'd like to make is to add the ability of defining class-based actor where grouping common functionality among different "tasks" or "actors" is possible.

Thank you.

@rakanalh

This comment has been minimized.

Contributor

rakanalh commented Nov 13, 2017

It seems like the actor decorator instantiates an Actor instance after validating queue name and fetching broker instance. Which means that Actor can be extended but super().__init__ has to receive the proper values (https://github.com/Bogdanp/dramatiq/blob/master/dramatiq/actor.py#L67)

@rakanalh rakanalh closed this Nov 13, 2017

@Bogdanp

This comment has been minimized.

Owner

Bogdanp commented Nov 14, 2017

Hi! Thank you for the kind words. Out of curiosity, can you show me a minimal example of how you were thinking this would work?

@rakanalh rakanalh reopened this Nov 14, 2017

@rakanalh

This comment has been minimized.

Contributor

rakanalh commented Nov 14, 2017

As i realized what i described above is not the case, here's what i am trying to do:

class CoreTask(Actor):
    def __init__(self, queue_name):
        super().__init__(
            fn=self,
            broker=get_broker(),
            actor_name=self.__class__.__name__,
            queue_name=queue_name,
            priority=0,
            options={}
        )


class ServiceTask(CoreTask):
    abstract = True

    JOB_TYPE = ServiceJob.JOB_UPDATE

    def __init__(self, queue_name):
        super().__init__(queue_name)

    def __call__(self, user, service, job=None, *args, **kwargs):
        self._assign_user(user)
        self._assign_service(service)
        self._assign_job(job)

        result = self.execute(job=self.job, *args, **kwargs)

        self.job.finished_at = timezone.now()
        self.job.save()

        return result

Any class-based task that i have basically will extend ServiceTask to receive user & service parameters and then create a job record in the database and call execute on the task so that the child task has all info required to complete successfully.

What i realized is that, declaring tasks such that:

class MyTask(ServiceTask):
    def __init__(self):
        super().__init__(queue_name='queue1')

Won't cut it because when the worker starts, the task has to be instantiated to be recognized by the broker via declare_actor. A workaround to this might be to add:

MyTask()

At the very end of the file but that doesn't look convenient. Other ideas could be to call get_broker().declare_actor(MyTask()) but that would be redundant given that the Actor class does that.

Any other ideas about how this could be done other than the above?

@Bogdanp

This comment has been minimized.

Owner

Bogdanp commented Nov 14, 2017

How about something like this?

import dramatiq


class actor(type):
    def __new__(cls, name, bases, attrs):
        clazz = super().__new__(cls, name, bases, attrs)
        abstract = attrs.get("abstract", False)
        if not abstract:
            return dramatiq.actor(clazz())
        return clazz


class Actor(metaclass=actor):
    abstract = True

    @property
    def __name__(self):
        return type(self).__name__

    def __call__(self, *args, **kwargs):
        return self.perform(*args, **kwargs)

    def perform(self):
        raise NotImplementedError


class CoreTask(Actor):
    abstract = True

    def get_task_name(self):
        raise NotImplementedError

    def perform(self):
        print(f"Performing task {self.get_task_name()}")


class FooTask(CoreTask):
    def get_task_name(self):
        return "Foo"


class BarTask(CoreTask):
    def get_task_name(self):
        return "Bar"


if __name__ == "__main__":
    FooTask.send()
    BarTask.send()
@rakanalh

This comment has been minimized.

Contributor

rakanalh commented Nov 15, 2017

Yea this looks pretty good actually. Would this be something we can have in dramatiq core?

@Bogdanp Bogdanp self-assigned this Nov 15, 2017

@Bogdanp Bogdanp added the enhancement label Nov 15, 2017

@Bogdanp

This comment has been minimized.

Owner

Bogdanp commented Nov 15, 2017

Yup, I think this is a useful addition so I'll add it for 0.13.

@Bogdanp Bogdanp added this to the 0.13 milestone Nov 15, 2017

@Bogdanp

This comment has been minimized.

Owner

Bogdanp commented Nov 15, 2017

This'll be a part of 0.13 https://dramatiq.io/reference.html#dramatiq.GenericActor . I'll leave this issue open until it's released.

@rakanalh

This comment has been minimized.

Contributor

rakanalh commented Nov 15, 2017

Excellent, looking forward to 0.13

@Bogdanp

This comment has been minimized.

Owner

Bogdanp commented Nov 15, 2017

Released!

@Bogdanp Bogdanp closed this Nov 15, 2017

@MaxRichter

This comment has been minimized.

MaxRichter commented Jun 18, 2018

Hi Bogdanp,

how can I pass variables to my class?
I understand that the Actor's perform(self) function is called, but this does not accept any parameters.

Meaning when I call the FooTask.send() as stated in your example, I want to do the following: FooTask.send(param_1='test', param_2=20).

Is this possible with the GenericActor?

@Bogdanp

This comment has been minimized.

Owner

Bogdanp commented Jun 18, 2018

Hi @MaxRichter, when you subclass GenericActor, you are expected to override the perform method. At that point you can have your perform method take any parameters you like and those params will be relayed by send:

class Adder(GenericActor):
  def perform(self, a, b):
    print(a + b)

Adder.send(1, 2)
@MaxRichter

This comment has been minimized.

MaxRichter commented Jun 18, 2018

Hi @Bogdanp,

thank you for the quick answer. This is working for me although I am getting the following warning:

Signature of method 'Adder.perform()' does not match signature of base method in class 'GenericActor'.

I executed the case you provided - maybe I understood the concept of subclasses wrong or can I ignore this warning?

@Bogdanp

This comment has been minimized.

Owner

Bogdanp commented Jun 18, 2018

Where are you getting that from? mypy? If so, you can ignore it.

@MaxRichter

This comment has been minimized.

MaxRichter commented Jun 18, 2018

I am getting this from PyCharm IDE.

By the way, is there an option to do "store_results=True" in GenericActor similar as in the function definition @dramatiq.actor(store_results=True)

@Bogdanp

This comment has been minimized.

Owner

Bogdanp commented Jun 18, 2018

Yes, you can provide options like that via the inner Meta class.

@MaxRichter

This comment has been minimized.

MaxRichter commented Jun 18, 2018

Great, works! Thank you a lot for the hints.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment