Skip to content

cymoo/mita

Folders and files

NameName
Last commit message
Last commit date

Latest commit

ย 

History

3 Commits
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 

Repository files navigation

Mita

A minimum task scheduling library for Go, built on top of cron expressions with advanced features including concurrency control, error tracking, and flexible configuration options.

Features

  • ๐Ÿ• Flexible Scheduling - Standard cron expressions and fluent builder API
  • ๐Ÿ”’ Concurrency Control - Configurable max concurrent tasks and overlap prevention
  • ๐Ÿ“Š Execution Tracking - Automatic statistics for runs, errors, and execution status
  • ๐ŸŽฏ Context Injection - Support for both static and dynamic context value injection
  • ๐Ÿ”„ Graceful Shutdown - Waits for running tasks to complete before shutting down
  • ๐Ÿš€ Manual Triggers - Execute tasks on-demand outside their regular schedule
  • ๐ŸŽ›๏ธ Task Management - Full CRUD operations: enable, disable, remove tasks
  • ๐Ÿ“ Logging - Customizable logging output with detailed execution info
  • ๐ŸŒ Timezone Support - Configure task execution timezone
  • โšก Second Precision - Support for second-level scheduling granularity
  • ๐Ÿ›ก๏ธ Thread-Safe - Safe for concurrent use across multiple goroutines
  • ๐Ÿ” Rich Metadata - Track added time, last run, next run, and more
  • ๐ŸŒ Web Interface - Built-in web UI for monitoring and managing tasks

Installation

go get github.com/cymoo/mita

Quick Start

Basic Usage

package main

import (
    "context"
    "fmt"
    "time"
    "github.com/cymoo/mita"
)

func main() {
    // Create task manager
    tm := mita.New()
    
    // Add a task that runs every 5 seconds
    tm.AddTask("hello", mita.Every().Seconds(5), func(ctx context.Context) error {
        fmt.Println("Hello, World!")
        return nil
    })
    
    // Start the manager
    tm.Start()
    
    // Run for a while
    time.Sleep(30 * time.Second)
    
    // Graceful shutdown
    tm.Stop()
}

Advanced Configuration

// Create task manager with options
tm := mita.New(
    mita.WithLogger(customLogger),           // Custom logger
    mita.WithLocation(location),             // Set timezone
    mita.WithMaxConcurrent(5),               // Max concurrent tasks
    mita.WithAllowOverlapping(false),        // Prevent overlapping
    mita.WithContextValue("env", "prod"),    // Inject context values
)

Schedule Expressions

Using Builder API (Recommended)

// Every second
mita.Every().Second()

// Every minute
mita.Every().Minute()

// Every hour
mita.Every().Hour()

// Every day
mita.Every().Day()

// Every N seconds
mita.Every().Seconds(30)

// Every N minutes
mita.Every().Minutes(15)

// Every N hours
mita.Every().Hours(6)

// Every N days
mita.Every().Days(2)

// Daily at specific time
mita.Every().Day().At(14, 30)  // 2:30 PM daily

// Specific weekday
mita.Every().Day().At(9, 0).OnWeekday(time.Monday)  // Monday 9:00 AM

// Specific day of month
mita.Every().Day().At(0, 0).OnDay(1)  // 1st of every month at midnight

Using Raw Cron Expressions

// Format: second minute hour day month weekday
mita.Cron("0 30 * * * *")     // Every hour at 30 minutes
mita.Cron("0 0 2 * * *")      // Daily at 2:00 AM
mita.Cron("0 */15 * * * *")   // Every 15 minutes
mita.Cron("0 0 0 1 * *")      // 1st of month at midnight
mita.Cron("0 0 9 * * 1")      // Every Monday at 9:00 AM

Configuration Options

WithLogger

Set a custom logger:

logger := log.New(os.Stdout, "[TASK] ", log.LstdFlags)
tm := mita.New(mita.WithLogger(logger))

WithLocation

Set timezone for task execution:

location, _ := time.LoadLocation("America/New_York")
tm := mita.New(mita.WithLocation(location))

WithMaxConcurrent

Limit maximum concurrent tasks (0 = unlimited):

tm := mita.New(mita.WithMaxConcurrent(3))

WithAllowOverlapping

Control whether the same task can run concurrently:

// Prevent same task from running multiple instances
tm := mita.New(mita.WithAllowOverlapping(false))

// Allow same task to run concurrently
tm := mita.New(mita.WithAllowOverlapping(true))

WithContextValue

Inject static context values available to all tasks:

tm := mita.New(
    mita.WithContextValue("database", dbConnection),
    mita.WithContextValue("cache", redisClient),
    mita.WithContextValue("env", "production"),
)

WithContextInjector

Dynamically inject context values per execution:

tm := mita.New(
    mita.WithContextInjector(func(ctx context.Context, taskName string) context.Context {
        // Generate unique ID for each execution
        ctx = context.WithValue(ctx, "request_id", uuid.New().String())
        ctx = context.WithValue(ctx, "timestamp", time.Now())
        return ctx
    }),
)

Task Management

Adding Tasks

err := tm.AddTask("backup", mita.Every().Day().At(2, 0), func(ctx context.Context) error {
    // Perform backup logic
    return nil
})
if err != nil {
    log.Fatal(err)
}

Manual Execution

Trigger a task immediately outside its schedule:

err := tm.RunTaskNow("backup")
if err != nil {
    log.Printf("Manual trigger failed: %v", err)
}

Disabling Tasks

Temporarily disable a task without removing it:

err := tm.DisableTask("backup")

Enabling Tasks

Re-enable a previously disabled task:

err := tm.EnableTask("backup")

Removing Tasks

Permanently remove a task:

err := tm.RemoveTask("backup")

Query Task Information

Get information about a specific task:

taskInfo, err := tm.GetTask("backup")
if err == nil {
    fmt.Printf("Task: %s\n", taskInfo.Name)
    fmt.Printf("Schedule: %s\n", taskInfo.Schedule)
    fmt.Printf("Run Count: %d\n", taskInfo.RunCount)
    fmt.Printf("Error Count: %d\n", taskInfo.ErrorCount)
    fmt.Printf("Last Run: %s\n", taskInfo.LastRun)
    fmt.Printf("Next Run: %s\n", taskInfo.NextRun)
    fmt.Printf("Enabled: %v\n", taskInfo.Enabled)
    fmt.Printf("Running: %v\n", taskInfo.Running)
}

List all tasks:

tasks := tm.ListTasks()
for _, task := range tasks {
    fmt.Printf("%s - Next: %s, Runs: %d, Errors: %d\n", 
        task.Name, task.NextRun, task.RunCount, task.ErrorCount)
}

Statistics

Get aggregated statistics:

stats := tm.GetStats()
fmt.Printf("Total Tasks: %v\n", stats["total_tasks"])
fmt.Printf("Enabled Tasks: %v\n", stats["enabled_tasks"])
fmt.Printf("Running Tasks: %v\n", stats["running_tasks"])
fmt.Printf("Total Runs: %v\n", stats["total_runs"])
fmt.Printf("Total Errors: %v\n", stats["total_errors"])
fmt.Printf("Max Concurrent: %v\n", stats["max_concurrent"])
fmt.Printf("Allow Overlapping: %v\n", stats["allow_overlapping"])

Working with Context

Retrieving Task Name

tm.AddTask("example", mita.Every().Minute(), func(ctx context.Context) error {
    taskName := mita.GetTaskName(ctx)
    fmt.Printf("Current task: %s\n", taskName)
    return nil
})

Accessing Injected Values

tm.AddTask("example", mita.Every().Minute(), func(ctx context.Context) error {
    // Get injected static context values
    db := ctx.Value(mita.CtxtKey("database")).(*sql.DB)
    requestID := ctx.Value(mita.CtxtKey("request_id")).(string)
    env := ctx.Value(mita.CtxtKey("env")).(string)
    
    // Use injected values
    log.Printf("[%s] Processing in %s environment", requestID, env)
    rows, err := db.Query("SELECT * FROM users")
    // ...
    return nil
})

Handling Context Cancellation

Always check for context cancellation in long-running tasks:

tm.AddTask("long-running", mita.Every().Hour(), func(ctx context.Context) error {
    for i := 0; i < 100; i++ {
        select {
        case <-ctx.Done():
            // Task manager is shutting down
            log.Println("Task cancelled, cleaning up...")
            return ctx.Err()
        default:
            // Continue work
            time.Sleep(time.Second)
            // Process item i
        }
    }
    return nil
})

Dynamic Context Updates

Update context values at runtime:

// Set or update a context value
tm.SetContextValue("feature_flag", true)

// Retrieve a context value
value := tm.GetContextValue("feature_flag")
if enabled, ok := value.(bool); ok && enabled {
    // Feature is enabled
}

Error Handling

Errors returned from task functions are automatically logged and tracked:

tm.AddTask("api-call", mita.Every().Minutes(5), func(ctx context.Context) error {
    resp, err := http.Get("https://api.example.com/data")
    if err != nil {
        return fmt.Errorf("API call failed: %w", err)
    }
    defer resp.Body.Close()
    
    if resp.StatusCode != 200 {
        return fmt.Errorf("API returned error status: %d", resp.StatusCode)
    }
    
    // Process response
    return nil
})

// Later, check for errors
taskInfo, _ := tm.GetTask("api-call")
if taskInfo.LastError != "" {
    log.Printf("Task last failed with: %s", taskInfo.LastError)
    log.Printf("Error rate: %d/%d (%.1f%%)", 
        taskInfo.ErrorCount, 
        taskInfo.RunCount,
        float64(taskInfo.ErrorCount)/float64(taskInfo.RunCount)*100)
}

Graceful Shutdown

The task manager supports graceful shutdown with proper cleanup:

func main() {
    tm := mita.New()
    // ... add tasks
    tm.Start()
    
    // Listen for system signals
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
    <-sigChan
    
    // Gracefully shutdown (waits up to 30 seconds for running tasks)
    tm.Stop()
}

The Stop() method:

  1. Cancels the manager context (stops new executions)
  2. Stops the cron scheduler
  3. Waits for all running tasks to complete (up to 30 seconds)
  4. Logs completion status

Web Management Interface

The task manager includes a built-in web interface for monitoring and managing tasks through your browser.

Starting the Web Server

package main

import (
    "net/http"
    "github.com/cymoo/mita"
)

func main() {
    tm := mita.New()
    
    // Add your tasks
    tm.AddTask("example", mita.Every().Minute(), func(ctx context.Context) error {
        // Task logic
        return nil
    })
    
    tm.Start()
    
    // Create web handler mounted at /tasks
    mux := tm.WebHandler("/tasks")
    
    // Start HTTP server
    http.ListenAndServe(":8080", mux)
}

Web Interface Features

The web interface provides:

  • ๐Ÿ“‹ Task List Page (/tasks/) - View all tasks with:

    • Real-time status (Enabled/Disabled/Running)
    • Execution statistics and success rates
    • Last run time and next scheduled run
    • Error information for failed tasks
    • Action buttons: Enable, Disable, Run Now, Remove
  • ๐Ÿ“Š Statistics Page (/tasks/stats) - View aggregated metrics:

    • Total tasks, enabled tasks, running tasks
    • Total executions and error counts
    • Concurrency settings
    • Per-task performance with visual progress bars

Mounting at Custom Paths

// Mount at root
mux := tm.WebHandler("/")

// Mount at custom path
mux := tm.WebHandler("/admin/tasks")

// Integrate with existing HTTP server
existingMux := http.NewServeMux()
existingMux.Handle("/api/", apiHandler)
existingMux.Handle("/tasks/", tm.WebHandler("/tasks"))

Example with Authentication

func authMiddleware(next http.Handler) http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        // Check authentication
        if !isAuthenticated(r) {
            http.Error(w, "Unauthorized", http.StatusUnauthorized)
            return
        }
        next.ServeHTTP(w, r)
    })
}

func main() {
    tm := mita.New()
    tm.Start()
    
    mux := tm.WebHandler("/tasks")
    
    // Wrap with authentication
    http.ListenAndServe(":8080", authMiddleware(mux))
}

Integrating with Existing Applications

// Chi router
r := chi.NewRouter()
r.Mount("/", tm.WebHandler("/tasks"))

// Gin
router := gin.Default()
router.Any("/*any", gin.WrapH(tm.WebHandler("/tasks")))

// Echo
e := echo.New()
e.Any("/*", echo.WrapHandler(tm.WebHandler("/tasks")))

Complete Example

See _examples for a comprehensive, runnable example that demonstrates:

  • Multiple scheduling strategies
  • Concurrency control and overlap prevention
  • Error handling with simulated failures
  • Context injection (static and dynamic)
  • Task management operations (enable/disable)
  • Statistics and monitoring
  • Long-running tasks with cancellation
  • Graceful shutdown handling
  • Web interface integration

Run the example:

go run _examples/main.go

Best Practices

1. Set Appropriate Concurrency Limits

// For CPU-intensive tasks
tm := mita.New(mita.WithMaxConcurrent(runtime.NumCPU()))

// For I/O-bound tasks
tm := mita.New(mita.WithMaxConcurrent(20))

2. Handle Context Cancellation

Always respect context cancellation in long-running tasks:

func(ctx context.Context) error {
    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
            // Do work
        }
    }
}

3. Return Meaningful Errors

func(ctx context.Context) error {
    if err := doWork(); err != nil {
        return fmt.Errorf("failed to process batch %d: %w", batchID, err)
    }
    return nil
}

4. Prevent Overlapping for Critical Tasks

tm := mita.New(mita.WithAllowOverlapping(false))

5. Monitor Task Health

// Periodically check task statistics
ticker := time.NewTicker(5 * time.Minute)
go func() {
    for range ticker.C {
        stats := tm.GetStats()
        errorRate := float64(stats["total_errors"].(int64)) / float64(stats["total_runs"].(int64))
        if errorRate > 0.1 { // More than 10% errors
            alert("High task error rate detected")
        }
    }
}()

6. Use Timeouts for External Calls

tm.AddTask("api-call", schedule, func(ctx context.Context) error {
    ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
    defer cancel()
    
    req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
    resp, err := client.Do(req)
    // ...
})

7. Clean Up Resources

tm.AddTask("db-task", schedule, func(ctx context.Context) error {
    conn := pool.Get()
    defer conn.Close()
    
    // Use connection
    return nil
})

Performance Considerations

  • Memory Usage: Each task stores minimal metadata (~200 bytes)
  • Goroutines: One goroutine per concurrent task execution
  • Lock Contention: Read-write locks minimize contention on task metadata
  • Cron Performance: Uses the highly optimized robfig/cron library

Thread Safety

All mita methods are thread-safe and can be called concurrently:

// Safe to call from multiple goroutines
go tm.AddTask(name1, schedule1, task1)
go tm.AddTask(name2, schedule2, task2)
go tm.RunTaskNow(name1)
go tm.GetStats()

Limitations

  • Maximum timeout for graceful shutdown: 30 seconds
  • Task names must be unique
  • Cron expressions use 6 fields (seconds supported)
  • Context values are copied, not referenced (use pointers for shared state)

FAQ

Q: Can I update a task's schedule without removing it?
A: Currently, you need to remove and re-add the task. A future version may support schedule updates.

Q: What happens if a task is already running when triggered manually?
A: If AllowOverlapping is false, you'll get an error. If true, both instances will run.

Q: How do I handle tasks that might run longer than their interval?
A: Set WithAllowOverlapping(false) to skip executions if the previous one is still running.

Q: Can I pause the entire task manager?
A: Not directly. You can disable all tasks individually or stop and restart the manager.

Q: Is it safe to modify context values during execution?
A: Use SetContextValue() to update values. Changes apply to new executions, not running ones.

Q: Can I customize the web interface? A: The web interface is embedded in the library. For customization, you can build your own interface using its API methods.

Q: Is the web interface secure? A: The web interface has no built-in authentication. Always add authentication middleware when exposing it publicly (see examples above).

Testing

To test your tasks:

func TestMyTask(t *testing.T) {
    tm := mita.New()
    
    executed := false
    tm.AddTask("test", mita.Every().Second(), func(ctx context.Context) error {
        executed = true
        return nil
    })
    
    tm.Start()
    time.Sleep(2 * time.Second)
    tm.Stop()
    
    if !executed {
        t.Error("Task was not executed")
    }
}

License

MIT License - see LICENSE file for details.

About

A minimum task scheduling library for Go

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages