Skip to content

coherentdevs/go-service-framework

Repository files navigation

go-service-framework

This Go package provides primitives for building a Go service, specifically the manager, worker pool, and poller. The package's main purposes include:

Features

  • Manager: Manages grpc, http, and background services for the service
  • Worker Pool: An all-purpose worker pool that handles parallelizing of tasks. Worker pools can be chained together and these chains can be used to handle complex data flows.
  • Poller: A struct that polls a blockchain and feeds the data through an ETL pipeline using chained worker pools.

Installation

To install the package, you can use the go get command:

go get github.com/<your-username>/go-service-framework

Usage

Here is a generic Go service utilizing the framework to run a poller:

package main

import (
	"github.com/coherentdevs/go-service-framework/manager"
	"github.com/coherentdevs/go-service-framework/poller"
	"github.com/coherentdevs/go-service-framework/pool"
	"log"
	"net/http"
	"net/http/pprof"
)

func main() {
	mgr := manager.New(manager.WithoutGracefulShutdown())

	// load environment configuration, logging and other preliminaries
	cfg := initializeConfig(mgr.Logger())

	nodeClient := initializeNodeClient(cfg, mgr.Logger())
	dataCache := initializeCache(cfg, mgr.Logger())
	dataConnector := initializeDataConnector(mgr.Context(), cfg, mgr.Logger())

	tt := pool.NewThrottler(cfg.FetcherPoolThrottleBandwidth, cfg.FetcherPoolThrottleDuration)
	wp1 := pool.NewWorkerPool(
		"fetcher",
		pool.WithOutputChannel(),
		pool.WithThrottler(tt),
		pool.WithLogger(mgr.Logger()),
		pool.WithBandwidth(cfg.FetcherPoolBandwidth),
	)
	wp2 := pool.NewWorkerPool(
		"accumulator",
		pool.WithOutputChannel(),
		pool.WithLogger(mgr.Logger()),
		pool.WithBandwidth(cfg.AccumulatorPoolBandwidth),
	)
	wp3 := pool.NewWorkerPool(
		"writer",
		pool.WithLogger(mgr.Logger()),
		pool.WithBandwidth(cfg.WriterPoolBandwidth),
	)

	driver := initializeDriver(cfg.Poller.Blockchain, nodeClient, dataConnector, mgr.Logger())

	p := poller.New(
		&cfg.Poller,
		driver,
		poller.WithCache(dataCache),
		poller.WithFetchPool(wp1),
		poller.WithAccumulatePool(wp2),
		poller.WithWritePool(wp3),
		poller.WithLogger(mgr.Logger()),
		poller.WithMetrics(mgr.Metrics()),
	)

	srv := initializeServer(p, mgr.Logger())
	apiSrv := http.Server{
		Addr:    cfg.HttpServePort,
		Handler: srv.Router(),
	}

	// Register background services
	mgr.RegisterBackgroundSvc("fetcher pool", wp1.Start, wp1.Stop)
	mgr.RegisterBackgroundSvc("accumulator pool", wp2.Start, wp2.Stop)
	mgr.RegisterBackgroundSvc("writer pool", wp3.Start, wp3.Stop)
	mgr.RegisterBackgroundSvc("poller", p.Start, p.Stop)
	mgr.RegisterBackgroundSvc("throttler", tt.Start, tt.Stop)

	// Register HTTP servers
	mgr.RegisterHttpServer("api", &apiSrv)

	// Wait for interrupt signal
	mgr.WaitForInterrupt()
}

In this code:

  • The manager.New() function is used to create a new instance of the manager object, which controls the life cycle of all services and servers.
  • The poller.New() function is used to create a new poller object, which fetches, accumulates, and writes data.
  • The mgr.RegisterBackgroundSvc() function is used to register background services with the manager. These services will start when the manager starts and stop when the manager stops.
  • The mgr.RegisterHttpServer() function is used to register HTTP servers with the manager. These servers will start when the manager starts and stop when the manager stops.
  • The mgr.WaitForInterrupt() function makes the main function wait for an interrupt signal before stopping all services and servers.

Documentation

Below are details about key components of the package.

Manager

Manages grpc, http, and background services for the service. It helps in coordinating different types of services running in a Go application.

Worker Pool

The worker pool is an all-purpose worker pool that handles parallelizing of tasks. Worker pools can be chained together, and these chains can be used to handle complex data flows. Here is the main code for the Worker Pool:

Refer to the code in the worker_pool.go file in the pool directory.

It includes methods to manage the worker pool lifecycle such as Start, Stop, FlushAndRestart and methods to handle jobs and groups like PushJob, PushGroup, SetInputFeed, SetGroupInputFeed.

Poller

A module that polls a blockchain and feeds the data through an ETL pipeline using chained worker pools. It is used for ETLing blockchain data, utilizing worker pools to optimize efficiency and speed.

Refer to the code in the poller.go file in the poller directory.

It has modes which determine what the poller does on each iteration of its main routine's loop; these are determined by the distance of the cursor from chaintip and the success/failure state of the previous iteration.

Drivers for specific chains can be found in https://github.com/coherentdevs/evm-etl