Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create initial release #1

Closed
98 tasks done
adamcharnock opened this issue Aug 3, 2017 · 14 comments
Closed
98 tasks done

Create initial release #1

adamcharnock opened this issue Aug 3, 2017 · 14 comments

Comments

@adamcharnock
Copy link
Owner

adamcharnock commented Aug 3, 2017

Goals

  • Ease of development & debugging
  • Excellent tooling & documentation
  • Targeting smaller teams
  • High speed & low latency (but not at the expense of other goals)

MVP

  • Message transport layer
    • Message protocol (standard headers) Handled by individual transports
  • Result transport layer
  • Api definition layer
    • Api class
    • Event class
    • Metadata
    • Registry
    • API Autodiscovery
  • Event transport layer
  • Error responses
  • Ability to listen to multiple events with a single handler (already available in transports, just needs API)
  • Refactor marshalling code. We need a distinct place for the marshalling code to live, currently it is being pulled between the BusClient, Message, and XxxTransport classes.

Required prior to use in production

  • Command line interface
  • Interactive REPL (lightbus shell)
  • Code cleanup (prior to schema / proper serialising & encoding)
  • Basic schema functionality
    • Creation of jsonschema from method signatures
    • SchemaTransport
    • Exporting schema json
    • Loading schema json
    • Validation of outgoing RPC parameters
    • Validation of incoming RPC parameters
    • Validation of outgoing RPC responses
    • Validation of incoming RPC responses
    • Validation of outgoing event parameters
    • Validation of incoming event parameters
  • Configuration system (yaml format? create jsonschema?)
    • inConfig format
    • Config loading
    • Config validation
    • Config loading on lightbus creation
    • Make config available to BusClient
    • Use bus-level config parameters
    • Use api-level config parameters
    • Api-level custom log levels – Non-trivial implementation. Will leave for now.
    • Plugin-level config
      • Plugin-level config structure
      • Create plugins using config options
      • Respect enabled flag
    • API-specific transports (non-critical)
    • Work through in-code config: comments
  • Pass API name and event name to event handlers (& update examples)
  • Datetime type mapping
  • Short-form command line arguments
  • Deploy new docs (pending issue filed with GitHub)
  • Work through in-code TODOs
    • Configuration options
    • Move schema config to root level
  • Redis
    • Update RPC transport to use PUBSUB?
    • Update Event transport to use streams consumer groups
  • Add stream_use config option, which will allow for serving all events on the API from a single stream
  • Config loading from URL/Redis
  • Pluggable configuration source (Will do this if needed, we now support http[s]:// urls anyway)
  • @bus.on_start() and @bus.on_stop(). Consider refactoring plugins into this format too (the plugin system would have to operate around modules rather than classes, which may be a faff).
  • @bus.configure() to allow for per-service bus configuration. Is this a good idea? (Leave for now, will revisit if there is a need)
  • Look into (and implement?) structured logging (see issue) - Now spun off into Implement (optional) structured logging #10
  • Implement synchronous context methods on lightbus_set_database()
  • Improve errors when performing RPC call with non-keyword arguments (check events too)
  • Improve schema validation errors
  • If event streaming – processing of events should stop if an error is encountered. This won't be practical if running multiple workers.
  • Cleanup python imports
  • Rename BusNode to something else. The 'node' term refers to the data structure it forms, but it can appear that a 'node' refers to the service.
  • Wait for redis the come up rather than dying on startup. Also lightbus should reconnect to redis. – actually, is this a good idea? Should the process be running if it isn't operational. If we support reconnection (which we should) then we must accept the process will be running when it isn't operational. Perhaps only try for 60 seconds though. – Implemented reconnections for rpc & event consumption. Decided that the pluggable nature of Lightbus' transports made it too hard to wait for redis to come up in a reliable way. Update: Lightbus now lazily connects to Redis, so that may go some way towards mitigating issues related to this point.
  • Transactional Transport: The duplicates table needs to track uniqueness on both message ID and consumer name. Migration steps: (Removed, see Transactional Transport #4)
    • ALTER TABLE lightbus_processed_events DROP CONSTRAINT lightbus_processed_events_pkey;
    • ALTER TABLE lightbus_processed_events ADD COLUMN listener_name VARCHAR(200);
    • UPDATE lightbus_processed_events SET listener_name = 'default';
    • ALTER TABLE lightbus_processed_events ADD PRIMARY KEY (message_id, listener_name);
  • Django: I suspect the transactional middleware creates a new postgres connection for every request. (Removed, see Transactional Transport #4)
  • Test RPCs/events with binary data
  • consumer_group_prefix -> service_name
  • consume(consumer_group=...) -> consume(listener_name=...)
  • xxx.listen(fn, options["consumer_group"]) -> xxx.listen(fn, listener_name="...")
  • Warn if duplicate consumer name used (Less important now that listener_name is mandatory when setting up a new listener)
  • Explicit, non-global, API registration
    • Docs
  • Config inheritance (in branch feature/config-inheritance. I'm not convinced this is a good idea)
    • Docs - Will leave this for now
  • Programatic scheduler
    • Docs
  • Non-global plugin registration
  • RFCs specifying protocol for all Redis transports
    • Event
    • RPC
    • Result
    • Schema
  • Fix CannotBlockHere exceptions / run user code in executors
  • Lazy-load schema
  • Migrate message IDs to UUIDs
  • Call to xpending should happen in a loop until there are no more messages
  • Standardise on command line argument position (before or after subcommand)
  • Cleanup old consumer groups and consumers

Ideas for future work / notes:

  • Ability to bypass bus overheads for local RPC calls (suggestion).
  • Rename Meta to Options as this is more descriptive and doesn't clash with the 'metaclass' concept. Downside is that this diverges from Django.
  • Store the last result result of each RPC call and event firing (for both success and failure). Need some kind of stats backend?
  • Examine existing ways of defining API schemas: JSON schema (more generic), SWAGGER/OpenAPI (Rest-specific)
  • Adding testing utilities: Assert RPC was called, assert event was fired etc
    • Support each API having stubs (reference)
      • There was an interesting article recently on different kinds of testing, in particular for services. Mocks v. stubs v. ?, but I cannot find it again.
    • It should be possible for your software to function (for development purposes) without anything else operating on the bus. (I.e. mocking/stubbing)
  • Schema: https://pypi.python.org/pypi/schema ?
  • Test against the list of naughty words - it has some nice odd characters in there.
  • Use AlpaceJS to generate forms in admin UI based on JSON schema
  • Create online configuration validator
  • Use code comments to populate json schema description field. (doc strings, #: style. Parse parameter descriptions out of docstring?)
  • Sanity check annotations for event listeners and RPCs. Issue a warning if annotated types are not JSON-compatible.
  • Management UI – consider refactoring away from React to something more lightweight. React is preventing me from wanting to pursue this further.

Reading

Pre launch tasks

  • Create celery comparison & migration guide
  • Create Django example + add docs for using with Django
  • @uses_django_db() - Either include in docs, or integrate it Lightbus core (Docs for now, see Create Django Middleware #6 for future improvements)
  • Create contributing guide
  • Create code of conduct
  • Create discourse room
  • Add python linting (done already?)
  • Initial benchmarks and caveat detailing that the project has yet to undergo any performance optimisations.
  • Finish docs
  • Document methods lacking docstrings
  • Release to Pypi
@adamcharnock
Copy link
Owner Author

Update: We now have Redis RPC & Result brokers working.

Server

screen shot 2017-10-08 at 17 45 37

Client

screen shot 2017-10-08 at 17 45 52

@adamcharnock
Copy link
Owner Author

Autodiscovery

I've decided to forgo autodiscovery for now. I think this could work well inside a predictable project structure – such as a Django project – but a general purpose solution would need to exhaustively import a lot of modules in order to discover bus.py modules.

I think this is probably doable, but for now it seems best to omit this particularly magical feature. I suspect more knowledge of Python's import system will be required to do this well.

Initial naive implementation was:

# Initial naive (and abandoned) implementation of bus.py autodiscovery
def autodiscover(directory='.'):
    """Try to discover & import bus.py files"""
    lightbus_directory = Path(__file__).parent.resolve()
    # Find all bus.py files
    matches = [
        Path(match).resolve()
        for match
        in glob(str(Path(directory) / '**' / 'bus.py'), recursive=True)
    ]
    # Filter out any that are within the lightbus package
    matches = [m for m in matches if not str(m).startswith(str(lightbus_directory) + '/')]
    # Import each match
    for match in matches:
        # WARNING: This use of spec_from_file_location() is incorrect. The first parameter needs to 
        # be the entire module name, but here we just use 'bus'.
        spec = importlib.util.spec_from_file_location('bus', str(match))
        bus_module = importlib.util.module_from_spec(spec)
        spec.loader.exec_module(bus_module)

@adamcharnock
Copy link
Owner Author

After some thought, I think some basic autodiscovery will still be useful. Essentially using the first bus.py file yielded by a breadth-first search. Then determine the module name from sys.path, and perform the import.

However, only the top level bus.py file will be imported, anything else will have to be imported from there. Will attempt implementation

@adamcharnock
Copy link
Owner Author

Work has been continuing over the last few months. Items above are getting checked off pretty quickly now, so I'm hoping this proof of concept / MVP should be ready for the light of day in the next few months.

During this time we should also see the release of Redis 5.0, which will include streams (ref).

@adamcharnock
Copy link
Owner Author

adamcharnock commented Mar 5, 2018

Latest thoughts on how to specify parameters for events. Unlike RPCs, event definitions are provided by class properties rather than methods. With RPCs we can extract the RPC signature from the method definition using Python's built-in inspect module.

With events we do not use method definitions as this would imply that there should also be an implementation. The implementation should live in the API's clients not the API itself. We therefore use class properties, but as a result we lose the ability to extract a parameter signature.

I've explored various ideas below regarding how we can specify parameters for events:

# How can we specify parameters for events?

class TestApi(Api):
    # The current implementation
    user_registered1 = Event(parameters=['username', 'email', 'is_admin'])

    # Simple, no default values, no **kwargs
    user_registered2 = Event(parameters={'username': str, 'email': str, 'is_admin': bool})

    # Verbose, but has default values and **kwargs
    user_registered3 = Event(parameters=[
        inspect.Parameter('username', kind=inspect.Parameter.KEYWORD_ONLY, annotation=str),
        inspect.Parameter('email', kind=inspect.Parameter.KEYWORD_ONLY, annotation=str),
        inspect.Parameter('is_admin', kind=inspect.Parameter.KEYWORD_ONLY, annotation=str),
        inspect.Parameter('extra_fields', kind=inspect.Parameter.VAR_KEYWORD, annotation=str, default=False),
    ])

    # As above, but customised for more concise definitions
    user_registered4 = Event(parameters=[
        Parameter('username', str),
        Parameter('email', str),
        Parameter('is_admin', str),
        WildcardParameter('extra_fields', str, default=False),
    ])

    # Does not support **kwargs. Parameters can no longer be inline
    user_registered5 = Event(parameters=RegistrationEventParameters)

Where RegistrationEventParameters is:

class RegistrationEventParameters(NamedTuple):
    username: str
    email: str
    is_admin: str = False

Ultimately providing both options 1 & 4 seems preferable, in lieu of any better option.

@adamcharnock
Copy link
Owner Author

adamcharnock commented Mar 10, 2018

What should the config system look like?

  • Portable and transferable config
  • Therefore no python files - something like yaml/json would be more suitable
  • Can load config from various sources
    • File
    • URL
    • Redis key (Maybe.)
    • Are these just built in, or do we need a ConfigTransport?
  • Ability to auto-reload process when config changes? Or perhaps that should be provided by a config_changed event on the state API?
  • Ability to edit config in web UI

What different kinds of config are there:

  • Global bus config
    • Plugins
    • Plugin hook timeouts
  • Per API config
    • Transports (+ configs)
  • Transport config
    • Serialisers
    • Connection parameters

Future development:

  • Per API config
    • e.g. different APIs can use different transports, timeouts etc

Scratch pad:

plugins:
  internal_state:
    ping_interval: 60
  
  internal_metrics:
    enable: false

bus:
  schema:
    transport:
      redis: {}

apis:
  default:
    event_transport:
      class: lightbus.RedisTransport
      redis_url: redis://username:password@redis

  auth:
    validate: false
    event_transport:
      class: lightbus.RedisTransport
      redis_url: redis://username:password@another_redis_host

@adamcharnock
Copy link
Owner Author

Currently working on narrative documentation:

screen shot 2018-04-02 at 11 27 56

@adamcharnock
Copy link
Owner Author

adamcharnock commented Jun 25, 2018

A quick review of the various registries we are using:

  • Transport registry (internal to bus client)
  • Plugin registry (global)
  • API registry (global)
  • Listener registry (internal to bus client, mostly for internal use)
  • Schemas (internal to bus client, mostly for internal use)

Registries under consideration:

  • @bus.on_start() and @bus.on_stop() hooks (concerningly similar to the plugin hooks)
  • Config loader registry

@adamcharnock
Copy link
Owner Author

adamcharnock commented Jul 31, 2018

Recording some notes on lightbus prometheus exporter metrics:

lightbus_listener_lag_events
lightbus_listener_processed_events
lightbus_listener_processed_seconds
lightbus_listener_errors
lightbus_events_total
lightbus_called_rpcs
lightbus_processed_rpcs

labels:

    api_name
    event_name
    service_name
    transport_name

how:

    get_transport_metrics() method on transport
    get_listener_metrics() method on transport

@adamcharnock
Copy link
Owner Author

Note to self: I discuss versions and migrations on pages 199-200 of notebook 1.

@adamcharnock
Copy link
Owner Author

Resolving CannotBlockHere errors

There has been an ongoing issue with the Lightbus API which looks something like this:

  • Developer has a synchronous on_start handler
  • Lightbus starts up, starts its event loop, and calls the synchronous on_start handler
  • The on_start handler calls something like bus.my.event.listen(...)
  • This is a thin wrapper which calls block(bus.my.event.listen_async(...))
  • The block() utility tries to spin up an event loop in order to run the listen_async coroutine
  • block() explodes with a CannotBlockHere exception because it is already running within an event loop, and you cannot nest event loops.

The solution to date has been that one must write async handlers (e.g async def on_start()) if one actually wishes to do anything which interacts with the bus (which is pretty much needed to do anything useful). This is not desirable as it forces async code on developers using Lightbus, rather than it being optional.

The solution

The general solution to this is to run these hooks in an asyncio thread executor. Event loops are per-thread, so the hook its free to create its own loop as it now has its own thread.

There is a problem with this solution however: Lightbus has not been designed with treading in mind, and so is not threads-safe. Lightbus has been designed using asyncio, which puts lightbus in control of context switches (i.e. at any await statement). Threads on the other hand can context switch at any point.

Could Lightbus become thread safe? Most likely yes. However, I prefer to avoid this at this stage because 1) threading bugs are hard to track down, 2) I'm not particularly knowledgeable in this area, and 3) I would like Lightbus to stabilise more before embarking on this.

There is an alternative however. We are not using threads because we need threads, we are using threads to provide a clean environment for user code (i.e. an environment in which an event loop can be started). We therefore came to the following solution:

  1. Only one asyncio task can be awake at once (across all threads). A global lock must be aquired in order to wakeup a task. This prevents arbitrary context switching within tasks.
  2. The BusClient handles all bus interactions within the main thread when running as a worker. This ensures that the BusClient is only exposed to a single event loop.
  3. What about when not running as a worker? Let's do a test with gunicorn.

@adamcharnock
Copy link
Owner Author

The above solution has now been implemented and merged. It appears to be working well.

@adamcharnock adamcharnock changed the title Create working proof-of-concept Create initial release Oct 29, 2019
@adamcharnock
Copy link
Owner Author

All development tasks are now complete. All that's left to go at the 'Pre launch tasks' which relate mostly to some additional documentation sections.

@adamcharnock
Copy link
Owner Author

Initial release is out! This epic issue can finally be closed, with all tasks complete 🎉

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant