Skip to content

bmeares/mrsm-compose-template

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

6 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Meerschaum Compose Project Template

This template is for quickly building Dockerized Meerschaum Compose projects. For a more detailed example, refer to the Tech Slam 'N Eggs example repository.

Docker is not required but docker/Dockerfile is provided for convenience.

Plugins

Meerschaum's plugin system allows you to do the following:

Consult the plugins documentation for how to write your own plugins. The plugin example has been added as reference for plugin:example and example:{label}.

Getting Started

Build and start the container:

docker compose up --build -d

Jump into a shell:

docker compose exec -it mrsm-compose bash

Once inside the container, you may now make changes and begin your development process. Here are some useful commands to get started:

mrsm compose explain

This will parse your compose file and print your current environment and state of the pipes.

mrsm compose run

The default command in docker/bootstrap.sh is mrsm compose run because it does the following:

  • Register and update the parameters for your pipes.
  • Sync them one-by-one.

Flags you pass to compose run are passed to sync pipes, including custom arguments added via add_plugin_argument().

Other useful compose commands:

  • compose down -v
    Stop any running jobs and delete the pipes (-v or --drop).
  • compose up --dry
    Register or update the pipes' registrations (--dry prevents syncing from happening).
  • compose ps
    Print the currently running jobs (started by compose up).

All other commands are executed within the context of the isolated environment (with the flag --tags appended as well). You may manage your pipes as usual with any regular mrsm commands.

fetch() vs sync()

In most cases, a simple fetch() function is all that's needed to get the job done. fetch() returns data or a generator of chunks to be passed into Pipe.sync():

import meerschaum as mrsm
from meerschaum.utils.typing import List, Dict, Any

def fetch(
        pipe: mrsm.Pipe,
        **kwargs: Any
    ) -> List[Dict[str, Any]]:
    """
    Return any of the following:

    - Pandas DataFrame
    - List of dictionaries
    - Dictionary of lists
    - A generator of the following (yield chunks)
    """
    return [
        {
            'datetime': '2023-01-01',
            'id': 1,
            'value': 100.0,
        },
        {
            'datetime': '2023-01-01',
            'id': 2,
            'value': 200.0,
        },
    ]

For more fine-grained control, sync() expects a SuccessTuple (bool and str). sync() overrides fetch(), so you may use fetch() within sync(). Consider this example which resyncs days with different rowcounts:

from datetime import datetime, timedelta
import meerschaum as mrsm
from meerschaum.utils.typing import Any, SuccessTuple
from meerschaum.utils.misc import round_time

required = ['python-dateutil']

def sync(
        pipe: mrsm.Pipe,
        begin: Optional[datetime] = None,
        end: Optional[datetime] = None,
        **kwargs: Any
    ) -> SuccessTuple:
    """
    Custom syncing strategy: resync days with different rowcounts.
    """
    from dateutil.rrule import rrule, DAILY

    begin = begin or datetime(2023, 1, 1)
    end = end or datetime(2024, 1, 1)

    days_synced = 0
    for day in rrule(freq=DAILY, dtstart=begin, until=end):
        chunk_begin = day
        chunk_end = day + timedelta(days=1)

        pipe_rowcount = pipe.get_rowcount(begin=chunk_begin, end=chunk_end)
        remote_rowcount = get_remote_rowcount(pipe, begin=chunk_begin, end=chunk_end)
        
        if pipe_rowcount == remote_rowcount:
            continue

        docs = fetch(pipe, begin, end)
        chunk_success, chunk_msg = pipe.sync(docs, **kwargs)
        if not chunk_success:
            return chunk_success, f"Failed to sync {day}:\n" + chunk_msg

        days_synced += 1
    
    return True, f"Successfully synced {days_synced} days."


def get_remote_rowcount(
        pipe: mrsm.Pipe,
        begin: datetime,
        end: datetime,
    ) -> int:
    ...


def fetch(
        pipe: mrsm.Pipe,
        begin: datetime,
        end: datetime,
    ) -> List[Dict[str, Any]]:
    ...

Publishing Your Plugins

If you choose to publish your plugins to the public repository, make an account at https://api.mrsm.io and run register plugin, e.g. (assuming my-awesome-plugin.py exists):

mrsm compose login api:mrsm
mrsm compose register plugin my-awesome-plugin

You can view your published plugin at https://api.mrsm.io/dash/plugins.

You may also publish your plugins to your private repository with --repository or -r:

### Define api:private through the wizard.
### On your repository's host, start the repository with `mrsm start api`.
mrsm compose bootstrap connector
mrsm compose register plugin my-awesome-plugin -r api:private

To remove the plugin from the repository, run mrsm delete plugin:

mrsm compose delete plugin my-awesome-plugin

About

Bootstrap your Meerschaum Compose project with this template repository.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published