Skip to content

Commit

Permalink
add workPool
Browse files Browse the repository at this point in the history
  • Loading branch information
i5heu committed Jun 4, 2024
1 parent a1aacdc commit f49e18d
Show file tree
Hide file tree
Showing 3 changed files with 216 additions and 4 deletions.
28 changes: 24 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ A embedded database built around the concept of event trees, emphasizing data de
- [Benchmark Versions](#benchmark-versions)
- [OuroborosDB Performance Version Differences](#ouroborosdb-performance-version-differences)
- [OuroborosDB Performance Changelog](#ouroborosdb-performance-changelog)
- [Current Features](#current-features)
- [1.0.0 Features](#100-features)
- [Future Features](#future-features)
- [Current Problems and things to research:](#current-problems-and-things-to-research)
- [Name and Logo](#name-and-logo)
Expand Down Expand Up @@ -228,14 +228,34 @@ geomean
- **v0.0.2** - Create tests and benchmarks


## Current Features
## 1.0.0 Features
🚧 = currently in development

- [x] Data Deduplication
- [x] Basic Data Store and Retrieval
- [ ] Data Integrity Checks
- [x] Child to Parent Index
- [ ] Data Basics
- [ ] 🚧 Data Compression
- [ ] xz
- [ ] zstd
- [ ] Erasure coding
- [ ] Encryption
- [ ] Data Integrity Checks
- [ ] Distributed System
- [ ] 🚧 Bootstrap System
- [ ] Authentication
- [ ] Message Distribution
- [ ] Broadcast
- [ ] Unicast
- [ ] Data Replication
- [ ] DHT for Sharding? - maybe full HT is enough
- [ ] Data Collection
- [ ] Find and Collect Data in Network
- [ ] Allow other nodes that are faster to collect data and send it to the slower with

## Future Features
- [ ] Full Text Search
- [ ] Full Text Search - blevesearch/bleve
- [ ] Semantic Search with API requests for Embeddings
- [ ] Is the deletion of not Temporary Events a good idea?
- Maybe if only some superUser can delete them with a key or something.
- [ ] It would be nice to have pipelines that can run custom JS or webassembly to do arbitrary things.
Expand Down
47 changes: 47 additions & 0 deletions cmd/workerPoolTester/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package main

import (
"fmt"

workerpool "github.com/i5heu/ouroboros-db/pkg/workerPool"
)

type funcResult struct {
bob int
err error
}

func main() {
work := []int{}

for i := 0; i < 50000; i++ {
work = append(work, i)
}

wp := workerpool.NewWorkerPool(workerpool.Config{GlobalBuffer: 1000})
room := wp.CreateRoom(1000)

room.AsyncCollector()

for _, w := range work {
task := w
room.NewTaskWaitForFreeSlot(func() interface{} {
return funcResult{
bob: task * task,
err: fmt.Errorf("Hello World"),
}
})
}

result, err := room.GetAsyncResults()
if err != nil {
fmt.Println(err)
}

field, ok := result[500].(funcResult)
if !ok {
fmt.Println("not ok")
}

fmt.Println(len(result), field.err.Error())
}
145 changes: 145 additions & 0 deletions pkg/workerPool/workerPool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package workerpool

import (
"fmt"
"runtime"
"sync"
"sync/atomic"
)

type WorkerPool struct {
config Config
taskQueue chan Task
resultRegistry map[int]chan interface{}
}

type Config struct {
WorkerCount int
GlobalBuffer int
}

type Room struct {
result []interface{}
resultMutex sync.Mutex
asyncCollectorWait sync.WaitGroup
asyncCollectorActive atomic.Bool
bufferSize int
resultChan chan interface{}
wg sync.WaitGroup
wp *WorkerPool
}

type Task struct {
run func() interface{}
room *Room
}

func NewWorkerPool(config Config) *WorkerPool {
if config.WorkerCount < 1 {
numberOfCPUs := runtime.NumCPU()
numberOfWorkers := (numberOfCPUs * 3)
config.WorkerCount = numberOfWorkers
}

if config.GlobalBuffer < 1 {
config.GlobalBuffer = 10000
}

taskQueue := make(chan Task, config.GlobalBuffer)

wp := &WorkerPool{
config: config,
taskQueue: taskQueue,
}

for i := 0; i < config.WorkerCount; i++ {
go wp.Worker()
}

return wp
}

func (wp *WorkerPool) Worker() {
for t := range wp.taskQueue {
t.room.resultChan <- t.run()
t.room.wg.Done()
}
}

func (wp *WorkerPool) CreateRoom(size int) *Room {
return &Room{
bufferSize: size,
resultChan: make(chan interface{}, size),
wp: wp,
asyncCollectorWait: sync.WaitGroup{},
}
}

func (ro *Room) NewTaskWaitForFreeSlot(job func() interface{}) {
task := Task{
run: job,
room: ro,
}
ro.wg.Add(1)
ro.wp.taskQueue <- task
}

func (ro *Room) NewTask(job func() interface{}) error {
if len(ro.wp.taskQueue) == cap(ro.wp.taskQueue) {
return fmt.Errorf("Global buffer is full. Please wait for some tasks to finish. Or increase the buffer size.")
}

if len(ro.resultChan) == cap(ro.resultChan) {
return fmt.Errorf("Room buffer is full. Please wait for some tasks to finish. Or increase the buffer size.")
}

ro.NewTaskWaitForFreeSlot(job)

return nil
}

func (ro *Room) Collect() []interface{} {
go ro.WaitAndClose()
results := make([]interface{}, 0)

for result := range ro.resultChan {
results = append(results, result)
}

return results
}

func (ro *Room) AsyncCollector() {
if ro.asyncCollectorActive.Load() {
return
}

ro.asyncCollectorActive.Store(true)
ro.asyncCollectorWait.Add(1)

go func() {
defer ro.asyncCollectorActive.Store(false)
defer ro.asyncCollectorWait.Done()

ro.resultMutex.Lock()
for result := range ro.resultChan {
ro.result = append(ro.result, result)
}
ro.resultMutex.Unlock()
}()
}

func (ro *Room) GetAsyncResults() ([]interface{}, error) {
go ro.WaitAndClose()
ro.asyncCollectorWait.Wait()

ro.resultMutex.Lock()
defer ro.resultMutex.Unlock()

return ro.result, nil
}

func (ro *Room) WaitAndClose() {
ro.wg.Wait()
close(ro.resultChan)
}

0 comments on commit f49e18d

Please sign in to comment.