From f49e18dd3f72972d931b8ac59e5697defd4420c5 Mon Sep 17 00:00:00 2001 From: i5heu Date: Tue, 4 Jun 2024 15:19:14 +0000 Subject: [PATCH] add workPool --- README.md | 28 ++++++- cmd/workerPoolTester/main.go | 47 ++++++++++++ pkg/workerPool/workerPool.go | 145 +++++++++++++++++++++++++++++++++++ 3 files changed, 216 insertions(+), 4 deletions(-) create mode 100644 cmd/workerPoolTester/main.go create mode 100644 pkg/workerPool/workerPool.go diff --git a/README.md b/README.md index f0f765d..140de6e 100644 --- a/README.md +++ b/README.md @@ -38,7 +38,7 @@ A embedded database built around the concept of event trees, emphasizing data de - [Benchmark Versions](#benchmark-versions) - [OuroborosDB Performance Version Differences](#ouroborosdb-performance-version-differences) - [OuroborosDB Performance Changelog](#ouroborosdb-performance-changelog) - - [Current Features](#current-features) + - [1.0.0 Features](#100-features) - [Future Features](#future-features) - [Current Problems and things to research:](#current-problems-and-things-to-research) - [Name and Logo](#name-and-logo) @@ -228,14 +228,34 @@ geomean - **v0.0.2** - Create tests and benchmarks -## Current Features +## 1.0.0 Features +🚧 = currently in development + - [x] Data Deduplication - [x] Basic Data Store and Retrieval -- [ ] Data Integrity Checks - [x] Child to Parent Index +- [ ] Data Basics + - [ ] 🚧 Data Compression + - [ ] xz + - [ ] zstd + - [ ] Erasure coding + - [ ] Encryption + - [ ] Data Integrity Checks +- [ ] Distributed System + - [ ] 🚧 Bootstrap System + - [ ] Authentication + - [ ] Message Distribution + - [ ] Broadcast + - [ ] Unicast + - [ ] Data Replication + - [ ] DHT for Sharding? - maybe full HT is enough + - [ ] Data Collection + - [ ] Find and Collect Data in Network + - [ ] Allow other nodes that are faster to collect data and send it to the slower with ## Future Features -- [ ] Full Text Search +- [ ] Full Text Search - blevesearch/bleve + - [ ] Semantic Search with API requests for Embeddings - [ ] Is the deletion of not Temporary Events a good idea? - Maybe if only some superUser can delete them with a key or something. - [ ] It would be nice to have pipelines that can run custom JS or webassembly to do arbitrary things. diff --git a/cmd/workerPoolTester/main.go b/cmd/workerPoolTester/main.go new file mode 100644 index 0000000..ffdda13 --- /dev/null +++ b/cmd/workerPoolTester/main.go @@ -0,0 +1,47 @@ +package main + +import ( + "fmt" + + workerpool "github.com/i5heu/ouroboros-db/pkg/workerPool" +) + +type funcResult struct { + bob int + err error +} + +func main() { + work := []int{} + + for i := 0; i < 50000; i++ { + work = append(work, i) + } + + wp := workerpool.NewWorkerPool(workerpool.Config{GlobalBuffer: 1000}) + room := wp.CreateRoom(1000) + + room.AsyncCollector() + + for _, w := range work { + task := w + room.NewTaskWaitForFreeSlot(func() interface{} { + return funcResult{ + bob: task * task, + err: fmt.Errorf("Hello World"), + } + }) + } + + result, err := room.GetAsyncResults() + if err != nil { + fmt.Println(err) + } + + field, ok := result[500].(funcResult) + if !ok { + fmt.Println("not ok") + } + + fmt.Println(len(result), field.err.Error()) +} diff --git a/pkg/workerPool/workerPool.go b/pkg/workerPool/workerPool.go new file mode 100644 index 0000000..c5acce0 --- /dev/null +++ b/pkg/workerPool/workerPool.go @@ -0,0 +1,145 @@ +package workerpool + +import ( + "fmt" + "runtime" + "sync" + "sync/atomic" +) + +type WorkerPool struct { + config Config + taskQueue chan Task + resultRegistry map[int]chan interface{} +} + +type Config struct { + WorkerCount int + GlobalBuffer int +} + +type Room struct { + result []interface{} + resultMutex sync.Mutex + asyncCollectorWait sync.WaitGroup + asyncCollectorActive atomic.Bool + bufferSize int + resultChan chan interface{} + wg sync.WaitGroup + wp *WorkerPool +} + +type Task struct { + run func() interface{} + room *Room +} + +func NewWorkerPool(config Config) *WorkerPool { + if config.WorkerCount < 1 { + numberOfCPUs := runtime.NumCPU() + numberOfWorkers := (numberOfCPUs * 3) + config.WorkerCount = numberOfWorkers + } + + if config.GlobalBuffer < 1 { + config.GlobalBuffer = 10000 + } + + taskQueue := make(chan Task, config.GlobalBuffer) + + wp := &WorkerPool{ + config: config, + taskQueue: taskQueue, + } + + for i := 0; i < config.WorkerCount; i++ { + go wp.Worker() + } + + return wp +} + +func (wp *WorkerPool) Worker() { + for t := range wp.taskQueue { + t.room.resultChan <- t.run() + t.room.wg.Done() + } +} + +func (wp *WorkerPool) CreateRoom(size int) *Room { + return &Room{ + bufferSize: size, + resultChan: make(chan interface{}, size), + wp: wp, + asyncCollectorWait: sync.WaitGroup{}, + } +} + +func (ro *Room) NewTaskWaitForFreeSlot(job func() interface{}) { + task := Task{ + run: job, + room: ro, + } + ro.wg.Add(1) + ro.wp.taskQueue <- task +} + +func (ro *Room) NewTask(job func() interface{}) error { + if len(ro.wp.taskQueue) == cap(ro.wp.taskQueue) { + return fmt.Errorf("Global buffer is full. Please wait for some tasks to finish. Or increase the buffer size.") + } + + if len(ro.resultChan) == cap(ro.resultChan) { + return fmt.Errorf("Room buffer is full. Please wait for some tasks to finish. Or increase the buffer size.") + } + + ro.NewTaskWaitForFreeSlot(job) + + return nil +} + +func (ro *Room) Collect() []interface{} { + go ro.WaitAndClose() + results := make([]interface{}, 0) + + for result := range ro.resultChan { + results = append(results, result) + } + + return results +} + +func (ro *Room) AsyncCollector() { + if ro.asyncCollectorActive.Load() { + return + } + + ro.asyncCollectorActive.Store(true) + ro.asyncCollectorWait.Add(1) + + go func() { + defer ro.asyncCollectorActive.Store(false) + defer ro.asyncCollectorWait.Done() + + ro.resultMutex.Lock() + for result := range ro.resultChan { + ro.result = append(ro.result, result) + } + ro.resultMutex.Unlock() + }() +} + +func (ro *Room) GetAsyncResults() ([]interface{}, error) { + go ro.WaitAndClose() + ro.asyncCollectorWait.Wait() + + ro.resultMutex.Lock() + defer ro.resultMutex.Unlock() + + return ro.result, nil +} + +func (ro *Room) WaitAndClose() { + ro.wg.Wait() + close(ro.resultChan) +}