Skip to content

ajitashwath/taskflow

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

3 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

TaskFlow

An event-driven task processing backend built with Go. TaskFlow provides a scalable worker pool architecture for asynchronous task processing with PostgreSQL persistence.

Features

  • RESTful API - Clean HTTP API for task management
  • Worker Pool - Configurable concurrent task processing
  • PostgreSQL Storage - Reliable task persistence with JSONB support
  • Task Lifecycle - Track tasks through pending, running, done, and failed states
  • Graceful Shutdown - Clean worker termination and server shutdown
  • Structured Logging - JSON-formatted logs with slog
  • Docker Support - Easy local PostgreSQL setup with docker-compose

Architecture

TaskFlow follows a clean architecture pattern:

cmd/api/              # Application entry point
internal/
  ├── api/           # HTTP layer
  │   ├── handlers/  # Request handlers
  │   ├── middleware/# HTTP middleware
  │   ├── models/    # API models
  │   └── router.go  # Route definitions
  ├── config/        # Configuration management
  ├── storage/       # Data persistence layer
  │   ├── postgres/  # PostgreSQL implementation
  │   └── memory/    # In-memory implementation
  └── worker/        # Background task processing

Prerequisites

  • Go 1.22+ - Download
  • PostgreSQL 14+ - Local installation or cloud instance (Supabase, AWS RDS, etc.)
  • Docker (optional) - For local PostgreSQL via docker-compose

Installation

  1. Clone the repository:

    git clone https://github.com/yourusername/taskflow.git
    cd taskflow
  2. Install dependencies:

    go mod download

Configuration

TaskFlow uses environment variables for configuration:

Variable Description Default
PORT HTTP server port 8080
ENV Environment (development/production) development
DB_DSN PostgreSQL connection string postgres://user:password@localhost:5432/taskflow?sslmode=disable

Database Connection String Format

Standard PostgreSQL:

postgres://user:password@host:port/dbname?sslmode=disable

Supabase:

# Transaction mode (recommended)
postgres://user:password@host:6543/postgres?sslmode=require

# Session mode
postgres://user:password@host:5432/postgres?sslmode=require

Running Locally

Option 1: Using Docker Compose (Recommended)

  1. Start PostgreSQL:

    docker-compose up -d
  2. Run the server:

    go run cmd/api/main.go
  3. Verify it's running:

    curl localhost:8080/health

Option 2: Using External PostgreSQL

  1. Set the database connection string:

    Linux/macOS:

    export DB_DSN="postgres://user:password@host:port/dbname?sslmode=disable"

    Windows PowerShell:

    $env:DB_DSN="postgres://user:password@host:port/dbname?sslmode=disable"

    Windows CMD:

    set DB_DSN=postgres://user:password@host:port/dbname?sslmode=disable
  2. Run the server:

    go run cmd/api/main.go

Option 3: Using .env File

  1. Create a .env file (already gitignored):

    PORT=8080
    ENV=development
    DB_DSN=postgres://user:password@localhost:5432/taskflow?sslmode=disable
  2. Load environment and run:

    # Linux/macOS
    export $(cat .env | xargs) && go run cmd/api/main.go
    
    # Or use a tool like direnv or godotenv

API Documentation

Health Check

Check server status and environment.

GET /health

Response:

{
  "status": "ok",
  "env": "development"
}

Create Task

Submit a new task for processing.

POST /tasks
Content-Type: application/json

{
  "type": "email",
  "payload": {
    "to": "user@example.com",
    "subject": "Hello",
    "body": "Welcome to TaskFlow"
  }
}

Response (201 Created):

{
  "id": "550e8400-e29b-41d4-a716-446655440000",
  "type": "email",
  "payload": {
    "to": "user@example.com",
    "subject": "Hello",
    "body": "Welcome to TaskFlow"
  },
  "status": "pending",
  "created_at": "2024-02-09T10:30:00Z",
  "updated_at": "2024-02-09T10:30:00Z"
}

cURL Example:

curl -X POST http://localhost:8080/tasks \
  -H "Content-Type: application/json" \
  -d '{
    "type": "email",
    "payload": {
      "to": "user@example.com"
    }
  }'

PowerShell Example:

Invoke-RestMethod -Method Post -Uri "http://localhost:8080/tasks" `
  -Body (@{type="email"; payload=@{to="user@example.com"}} | ConvertTo-Json) `
  -ContentType "application/json"

Get Task

Retrieve a specific task by ID.

GET /tasks/{id}

Response (200 OK):

{
  "id": "550e8400-e29b-41d4-a716-446655440000",
  "type": "email",
  "payload": {
    "to": "user@example.com"
  },
  "status": "done",
  "created_at": "2024-02-09T10:30:00Z",
  "updated_at": "2024-02-09T10:30:02Z"
}

cURL Example:

curl http://localhost:8080/tasks/550e8400-e29b-41d4-a716-446655440000

List Tasks

Retrieve all tasks, ordered by creation date (newest first).

GET /tasks

Response (200 OK):

[
  {
    "id": "550e8400-e29b-41d4-a716-446655440000",
    "type": "email",
    "payload": {
      "to": "user@example.com"
    },
    "status": "done",
    "created_at": "2024-02-09T10:30:00Z",
    "updated_at": "2024-02-09T10:30:02Z"
  }
]

cURL Example:

curl http://localhost:8080/tasks

PowerShell Example:

Invoke-RestMethod -Method Get -Uri "http://localhost:8080/tasks"

Task Lifecycle

Tasks progress through the following states:

  1. pending - Task created and queued for processing
  2. running - Task picked up by a worker
  3. done - Task completed successfully
  4. failed - Task encountered an error (future implementation)

Worker Pool

The worker pool processes tasks asynchronously:

  • Default Configuration: 5 workers, queue size of 10
  • Processing Time: 2 seconds per task (simulated)
  • Graceful Shutdown: Workers complete current tasks before stopping

To modify worker pool settings, edit cmd/api/main.go:

// NewPool(numWorkers, queueSize, taskStore)
wp := worker.NewPool(10, 20, taskStore)  // 10 workers, 20 queue size

Database Schema

CREATE TABLE tasks (
    id UUID PRIMARY KEY,
    type VARCHAR(50) NOT NULL,
    payload JSONB NOT NULL,
    status VARCHAR(20) NOT NULL,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

CREATE INDEX idx_tasks_status ON tasks(status);

Development

Project Structure

taskflow/
├── cmd/
│   └── api/
│       └── main.go              # Application entry point
├── internal/
│   ├── api/
│   │   ├── handlers/
│   │   │   ├── health.go       # Health check handler
│   │   │   └── tasks.go        # Task CRUD handlers
│   │   ├── middleware/
│   │   │   └── logger.go       # Request logging middleware
│   │   ├── models/
│   │   │   └── task.go         # Task domain models
│   │   └── router.go           # HTTP router setup
│   ├── config/
│   │   └── config.go           # Configuration loader
│   ├── storage/
│   │   ├── postgres/
│   │   │   ├── migrations.go   # Database migrations
│   │   │   └── task_store.go   # PostgreSQL implementation
│   │   ├── memory/
│   │   │   └── task_store.go   # In-memory implementation
│   │   └── repository.go       # Storage interface
│   └── worker/
│       └── worker.go            # Worker pool implementation
├── migrations/
│   └── create_tasks_table.up.sql
├── docker-compose.yml
├── go.mod
├── go.sum
├── .gitignore
└── README.md

Building for Production

# Build binary
go build -o taskflow cmd/api/main.go

# Run binary
./taskflow

Running Tests

# Run all tests
go test ./...

# Run tests with coverage
go test -cover ./...

# Run tests with verbose output
go test -v ./...

Deployment

Environment Variables for Production

PORT=8080
ENV=production
DB_DSN=postgres://user:password@production-host:5432/taskflow?sslmode=require

Docker Deployment

Dockerfile (create this):

FROM golang:1.22-alpine AS builder
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN go build -o taskflow cmd/api/main.go

FROM alpine:latest
RUN apk --no-cache add ca-certificates
WORKDIR /root/
COPY --from=builder /app/taskflow .
EXPOSE 8080
CMD ["./taskflow"]

Build and run:

docker build -t taskflow .
docker run -p 8080:8080 -e DB_DSN="your-connection-string" taskflow

Monitoring

TaskFlow uses structured JSON logging. All logs are written to stdout.

Log Format:

{
  "time": "2024-02-09T10:30:00Z",
  "level": "INFO",
  "msg": "Request processed",
  "method": "POST",
  "path": "/tasks",
  "status": 201,
  "duration": "5ms"
}

Troubleshooting

Connection Issues

Error: Failed to connect to database

Solutions:

  • Verify DB_DSN environment variable is set correctly
  • Check PostgreSQL is running: docker-compose ps or pg_isready
  • Test connection string with psql
  • For Supabase, ensure you're using the correct port (6543 or 5432)

Migration Issues

Error: Failed to run migrations

Solutions:

  • Ensure your database user has CREATE TABLE privileges
  • Check PostgreSQL logs for detailed error messages
  • Verify database exists and is accessible

Queue Full Warnings

Warning: Task queue full, dropping task

Solutions:

  • Increase queue size in cmd/api/main.go
  • Add more workers to process tasks faster
  • Implement retry logic or persistent queue

Contributing

  1. Fork the repository
  2. Create a feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

Support

For issues, questions, or contributions, please open an issue on GitHub.

About

An event-driven backend that processes asynchronous tasks using a scalable worker pool.

Resources

Stars

Watchers

Forks

Contributors

Languages