# How to run this?

`Handler` is a component to implement the logic. The easiest way to run it is with an `Executor`

In [5]:
import os

import logging

os.environ['PROJECT'] = 'my_shiny_project'
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/cfg/creds.json'

logging.basicConfig(level=logging.INFO)

logging.getLogger('happyly').setLevel(logging.INFO)
logging.getLogger('google').enabled = True

In [1]:
from happyly.handling import Handler
from happyly.listening import Executor


class MyHandler(Handler):

    def handle(self, _):
        print('Hello, world')

    def on_handling_failed(self, _, error: Exception):
        # notice that you have to explicitly handle errors, otherwise class will not be instantiated
        print(repr(error))


if __name__ == '__main__':
    executor = Executor(
        handler=MyHandler(),
    )

    executor.run()

Hello, world


# Executor? Handler? Why can't I just call my function!
As it is shown further, these are super extensible components for message handling. If you are being impatient, happyly provides high-level components. Let's use the same handler with a very easy to use `GoogleSimpleReceiver`:

In [28]:
import marshmallow
from marshmallow import fields

from happyly.google_pubsub import GoogleSimpleReceiver
from happyly.handling import Handler


class InputSchema(marshmallow.Schema):
    request_id = fields.Str(required=True)


class MyHandler(Handler):

    def handle(self, message):
        pass
#         print('Hello, world')
#         print(f'Request id is {message["request_id"]}')

    def on_handling_failed(self, message, error: Exception):
        print(repr(error))


if __name__ == '__main__':
    # auth setup here if needed

    future = GoogleSimpleReceiver(
        handler=MyHandler(),
        input_schema=InputSchema(strict=True),
        from_subscription='happyly_testing_01',
        project=os.environ['PROJECT'],
    ).start_listening()

    try:
        future.result()
    except KeyboardInterrupt:
        future.cancel()

INFO:happyly.google_pubsub.subscribers:Starting to listen to happyly_testing_01


`GoogleSimpleReceiver` subscribes via Google Pub/Sub subscription and listens for new messages infinitely. Usage of this component is so simple because of default assumptions: that you would use UTF-8 encoded JSON format in `message.data` for messaging and that your schema contains "request_id". But you can use any custom serializers and deserizers, as well as custom subscribers and publishers for any Pub/Sub tool (not only Google's). Notice how `Handler` operates on message attributes in the form of `dict`. `Handler` contains only logic and doesn't need to be changed if you change serialization format or Pub/Sub technology.

# Ok, here is the Sub. Is there some similar Pub component?
Sure! Check out `GoogleSimpleSender`!

In [6]:
import marshmallow
from marshmallow import fields

from happyly.google_pubsub import GoogleSimpleSender
from happyly.handling import Handler


class OutputSchema(marshmallow.Schema):
    request_id = fields.Str(required=True)
    label = fields.Str(request_id=True)


class MyHandler(Handler):

    def handle(self, message):
        return {
            'request_id': 'spam',
            'label': "eggs"
        }

    def on_handling_failed(self, message, error: Exception):
        print(repr(error))


if __name__ == '__main__':
    # auth setup here if needed

    GoogleSimpleSender(
        handler=MyHandler(),
        output_schema=OutputSchema(strict=True),
        to_topic='happyly_testing',
        project=os.environ['PROJECT'],
    ).run()

INFO:happyly.listening.executor:Message handled, status HandlingResultStatus.OK
INFO:happyly.listening.executor:Published result:
HandlingResult(status=<HandlingResultStatus.OK: 'OK'>, data={'request_id': 'spam', 'label': 'eggs'})


And you can even listen and reply with `GoogleSimpleReceiveAndReply`:

In [31]:
import marshmallow
from marshmallow import fields

from happyly.google_pubsub import GoogleSimpleReceiveAndReply
from happyly.handling import Handler


class InputSchema(marshmallow.Schema):
    request_id = fields.Str(required=True)
    label = fields.Str(request_id=True)


class OutputSchema(marshmallow.Schema):
    request_id = fields.Str(required=True)
    info = fields.Str(request_id=True)


class MyHandler(Handler):

    def handle(self, message):
        print(message['label'])
        return {
            'request_id': message['request_id'],
            'info': "eggs"
        }

    def on_handling_failed(self, message, error: Exception):
        print(repr(error))


if __name__ == '__main__':
    # auth setup here if needed

    future = GoogleSimpleReceiveAndReply(
        handler=MyHandler(),
        input_schema=InputSchema(strict=True),
        from_subscription='happyly_testing_01',
        output_schema=OutputSchema(strict=True),
        to_topic='happyly_testing_out',
        project=os.environ['PROJECT'],
    ).start_listening()

    try:
        future.result()
    except KeyboardInterrupt:
        future.cancel()


INFO:happyly.google_pubsub.subscribers:Starting to listen to happyly_testing_01


In [None]:
import marshmallow
from marshmallow import fields

from happyly.google_pubsub import GoogleCachedReceiveAndReply
from happyly.google_pubsub.redis_cacher import RedisCacher
from happyly.handling import Handler


class InputSchema(marshmallow.Schema):
    request_id = fields.Str(required=True)
    label = fields.Str(request_id=True)


class OutputSchema(marshmallow.Schema):
    request_id = fields.Str(required=True)
    info = fields.Str(request_id=True)


class MyHandler(Handler):

    def handle(self, message):
        print(message['label'])
        return {
            'request_id': message['request_id'],
            'info': "eggs"
        }

    def on_handling_failed(self, message, error: Exception):
        print(repr(error))


if __name__ == '__main__':
    # auth setup here if needed

    future = GoogleCachedReceiveAndReply(
        handler=MyHandler(),
        input_schema=InputSchema(strict=True),
        from_subscription='happyly_testing_01',
        output_schema=OutputSchema(strict=True),
        to_topic='happyly_testing_out',
        project=os.environ['PROJECT'],
        cacher=RedisCacher(host='localhost', port=6379)
    ).start_listening()

    try:
        future.result()
    except KeyboardInterrupt:
        future.cancel()


INFO:happyly.google_pubsub.redis_cacher:Cache was successfully initialized with Redis client (localhost:6379)
INFO:happyly.google_pubsub.subscribers:Starting to listen to happyly_testing_01
INFO:happyly.listening.executor:Received message:
 Message {
  data: b'{"request_id": "spam", "label": "eggs"}'
  attributes: {}
}
INFO:happyly.google_pubsub.redis_cacher:Cached message with id spam
INFO:happyly.listening.executor:Message handled, status HandlingResultStatus.OK


eggs


INFO:happyly.listening.executor:Received message:
 Message {
  data: b'{"request_id": "spam", "label": "eggs"}'
  attributes: {}
}
INFO:happyly.google_pubsub.redis_cacher:Cached message with id spam
INFO:happyly.listening.executor:Message handled, status HandlingResultStatus.OK


eggs


INFO:happyly.listening.executor:Published result:
HandlingResult(status=<HandlingResultStatus.OK: 'OK'>, data={'request_id': 'spam', 'info': 'eggs'})
INFO:happyly.google_pubsub.redis_cacher:Message with id spam was removed from cache
INFO:happyly.listening.executor:Published result:
HandlingResult(status=<HandlingResultStatus.OK: 'OK'>, data={'request_id': 'spam', 'info': 'eggs'})
INFO:happyly.google_pubsub.redis_cacher:Message with id spam was removed from cache


# What about some more customization?
You can use lower-level classes. For subscriptions there is a class `Listener`, which requires a subscriber and a deserializer. You can swap subscriber and deserializer to other implementations or use ones provided by happyly.

In [20]:
import marshmallow
from marshmallow import fields

from happyly.google_pubsub.deserializers import JSONDeserializerWithRequestIdRequired
from happyly.google_pubsub.subscribers import GooglePubSubSubscriber
from happyly.handling import Handler
from happyly.listening import Listener


class MySchema(marshmallow.Schema):
    request_id = fields.Str(required=True)
    info = fields.Str(required=True)


class MyHandler(Handler):

    def handle(self, message):
        print(f'Received message with {len(message)} attributes')
        print(f'request id is {message["request_id"]}')
        print(f'info is {message["info"]}')

    def on_handling_failed(self, message, error: Exception):
        print(repr(error))


if __name__ == '__main__':
    # auth setup here if needed

    listener = Listener(
        handler=MyHandler(),
        deserializer=JSONDeserializerWithRequestIdRequired(
            schema=MySchema(strict=True)
        ),
        subscriber=GooglePubSubSubscriber(
            project='my_shiny_project',
            subscription_name='happyly_testing_01'
        )
    )
    future = listener.start_listening()
    try:
        future.result()
    except KeyboardInterrupt:
        future.cancel()

Received message with 2 attributes
request id is spam
info is eggs


# Why yet another component: a deserializer?
Imagine you switch from Google Pub/Sub to another tool. Or probably you change the way you store message attributes from JSON to protobuf. Not a problem - just take or implement another `Deserializer` and the `Handler` with actual logic stays unchanged! Notice that here:

In [None]:
def handle(self, message):

message is just a Python dict. 

# Can I do similar customization for publishing?
Yep! Make sure `Handler.handle` returns a dict, then add Serializer and Publisher to the `Executor`. Or to `Listener` if you want to publish as a reaction to received message. Let's see:

In [11]:
import marshmallow
from marshmallow import fields

from happyly.google_pubsub.deserializers import JSONDeserializerWithRequestIdRequired
from happyly.google_pubsub.publishers import GooglePubSubPublisher
from happyly.google_pubsub.serializers import BinaryJSONSerializer
from happyly.handling import Handler
from happyly.listening.executor import Executor


class OutputSchema(marshmallow.Schema):
    request_id = fields.Str(required=True)
    code = fields.Int(required=True)


class MyHandler(Handler):

    def handle(self, message):
        print('Publishing......')
        return {
            'request_id': "hello",
            'code': 123,
        }

    def on_handling_failed(self, message, error: Exception):
        print(repr(error))


if __name__ == '__main__':
    # set up auth if needed here
    serializer = BinaryJSONSerializer(OutputSchema(strict=True))

    Executor(
        handler=MyHandler(),
        publisher=GooglePubSubPublisher(
            serializer=serializer,
            publish_all_to='happyly_testing',
            project='my_shiny_project',
        )
    ).run()

Publishing......


# Let's say i want to implement my own serializer. Or publisher. Or subscriber. How do I do that?
That's easy! Subclass the corresponding abstract base class and implement all abstract methods. That's it, a new component is ready for use.

In [32]:
import json
from typing import Any, Mapping

from happyly.serialization import Deserializer


class MyOwnDeserializer(Deserializer):

    def deserialize(self, message: Any) -> Mapping[str, Any]:
        return json.loads(message.attributes)

    def build_error_result(self, message: Any, error: Exception) -> Mapping[str, Any]:
        return {
            'request_id': message.attributes['request_id'],
            'status': 'ERROR',
            'error': repr(error),
        }

# I want to add more operations
You might want to use callbacks. Please first call base method.

In [23]:
from happyly.listening import Listener

import my_own_pretty_cache


class ListenerWithCaching(Listener):
    
    def on_received(self, message: Any):
        super().on_received(message)
        my_own_pretty_cache.save(message)

Hopefully you will use the library happily :)