Skip to content

danenmao/pterergate-dtf

Repository files navigation

pterergate-dtf

Pterergate-dtf (Pterergate Distributed Task Framework, PDTF) is a high-performance distributed task framework that supports parallelly scheduling thousands of running tasks deployed in a cluster consisting of tens thousands of nodes.

Go GoDoc Go Report Card GitHub

GitHub last commit (branch) GitHub commit activity (branch)

Install

go get github.com/danenmao/pterergate-dtf

Requirement

  1. MySQL

    PDTF uses a MySQL table tbl_task to store the information of created tasks. Users should provide a MySQL server, and create this table in a database.

    See the Usage part to known more.

  2. Redis

    PDTF uses Redis frequently to store kinds of intermediate data.Users should provide a Redis server.

    See the Usage part to known more.

Get Started

Read the Get Started wiki to get how to use PDTF.

Usage

  1. Implement ITaskGenerator, ITaskExecutor, ITaskSchedulerCallback and ITaskCollectorCallback. Users can perform their business logic in these interfaces.

    // implement taskmodel.ITaskGenerator
    type SampleGenerator struct{}
    
    // implement taskmodel.ITaskExecutor
    type SampleExecutor struct{}
    
    // implement taskmodel.ITaskSchedulerCallback
    type SampleSchedulerCallback struct{}
    
    // implement taskmodel.ITaskCollectorCallback
    type SampleCollectorCallback struct{}
  2. Implement a task plugin.

    // implement taskplugin.ITaskPlugin
    type SamplePlugin struct{
        PluginBody taskmodel.PluginBody
        PluginConf taskmodel.PluginConf
    }
    
    func (p * SamplePlugin) GetPluginConf(conf *taskmodel.PluginConf) error{
        *conf = p.PluginConf
        return nil
    }
    
    func  (p * SamplePlugin) GetPluginBody(body *taskmodel.PluginBody) error{
        *body = p.PluginBody
        return nil
    }
    
    var plugin = SamplePlugin{
        PluginBody: taskmodel.PluginBody{
            Generator: &SampleGenerator{},
            Executor: &SampleExecutor{},
            SchedulerCallback: &SampleSchedulerCallback{},
            CollectorCallback: &SampleCollectorCallback{},
        },
        PluginConf: taskmodel.PluginConf{
            IterationMode: taskmodel.IterationMode_No,
            TaskTypeTimeout: time.Hour,
        },
    }
  3. Register a task type.

    const SampleTaskType = 1
    register := taskplugin.TaskPluginRegistration{
        TaskType: SampleTaskType,
        Name: "SampleTaskType",
        Description: "a sample task type",
        PluginFactoryFn: func(p *ITaskPlugin) error{
            *p = &plugin
        }
    }
    
    err := dtf.RegisterTaskType(&register)
  4. Start the required dtf services.

    // start the task manager service
    err := dtf.StartService(
        dtfdef.ServiceRole_Manager, 
        dtf.WithMySQL(&extconfig.MySQLAddress{
            Name:"mysql", Type:"mysql", Protocol:"tcp", Address:"192.168.1.101:3306", Username:"servera", Password:"*", DB:"db_task",
        }),
        dtf.WithRedis(&extconfig.RedisAddress{
            Name:"redis", Type:"tcp", Address:"192.168.1.100:6380", Password:"*", DB:"0",
        }),
        dtf.WithMongoDB(&extconfig.MongoAddress{
            Address:"", Username:"", Password:"", Database:"", ReplicaSet:"",
        }),
    )
    // start the task generator service
    err := dtf.StartService(
        dtfdef.ServiceRole_Generator, 
        dtf.WithMySQL(&extconfig.MySQLAddress{...}),
        dtf.WithRedis(&extconfig.RedisAddress{...}),
    )
    // start the task scheduler service
    err := dtf.StartService(
        dtfdef.ServiceRole_Scheduler, 
        dtf.WithMySQL(&extconfig.MySQLAddress{...}),
        dtf.WithRedis(&extconfig.RedisAddress{...}),
        dtf.WithExecutor(serversupport.ExecutorInvoker{...}.GetInvoker()),
    )
    // define the executor server
    executorSvr := serversupport.ExecutorServer{...}
    
    // start the executor service
    err := dtf.StartService(
        dtfdef.ServiceRole_Executor, 
        dtf.WithMySQL(&extconfig.MySQLAddress{...}),
        dtf.WithRedis(&extconfig.RedisAddress{...}),
        dtf.WithRegisterExecutorHandler(executorSvr.GetRegister()),
        dtf.WithCollector(serversupport.CollectorInvoker{...}.GetInvoker()),
    )
    
    // start the executor server
    executorSvr.StartServer()
    // define the collector server
    collectorSvr := serversupport.CollectorServer{...}
    
    // start the collector service
    err := dtf.StartService(
        dtfdef.ServiceRole_Collector, 
        dtf.WithMySQL(&extconfig.MySQLAddress{...}),
        dtf.WithRedis(&extconfig.RedisAddress{...}),
        dtf.WithRegisterCollectorHandler(collectorSvr.GetRegister()),
    )
    
    // start the collector server
    colletorSvr.StartServer()
  5. Create a task to perform your business operations.

    taskParam := taskmodel.TaskParam{
        ...
    }
    
    taskId, err := dtf.CreateTask(
        SampleTaskType,
        taskParam,
    )
  6. Wait for the running services to exit.

    dtf.Join()

About

A high-performance distributed task framework.

Resources

License

Code of conduct

Stars

Watchers

Forks

Packages

No packages published

Languages