Skip to content

Commit

Permalink
sttorage pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
i5heu committed Jun 4, 2024
1 parent f1fb714 commit 5981740
Show file tree
Hide file tree
Showing 12 changed files with 227 additions and 176 deletions.
6 changes: 2 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,7 @@ geomean
- [x] Basic Data Store and Retrieval
- [x] Child to Parent Index
- [ ] Data Basics
- [ ] 🚧 Data Compression
- [ ] xz
- [ ] zstd
- [ ] 🚧 Data Compression with LZMA
- [ ] Erasure coding
- [ ] Encryption
- [ ] Data Integrity Checks
Expand All @@ -251,7 +249,7 @@ geomean
- [ ] DHT for Sharding? - maybe full HT is enough
- [ ] Data Collection
- [ ] Find and Collect Data in Network
- [ ] Allow other nodes that are faster to collect data and send it to the slower with
- [ ] Allow other nodes that are faster to collect data and send it to the slower with zstd

## Future Features
- [ ] Full Text Search - blevesearch/bleve
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ require (
github.com/ipfs/boxo v0.19.0
github.com/shirou/gopsutil v3.21.11+incompatible
github.com/stretchr/testify v1.9.0
github.com/ulikunitz/xz v0.5.12
google.golang.org/protobuf v1.34.0
gopkg.in/yaml.v2 v2.4.0
)

require (
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cloudflare/circl v1.3.8 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgraph-io/ristretto v0.1.1 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cloudflare/circl v1.3.8 h1:j+V8jJt09PoeMFIu2uh5JUyEaIHTXVOHslFoLNAKqwI=
github.com/cloudflare/circl v1.3.8/go.mod h1:PDRU+oXvdD7KCtgKxW95M5Z8BpSCJXQORiZFnBQS5QU=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
Expand Down Expand Up @@ -125,6 +127,8 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/ulikunitz/xz v0.5.12 h1:37Nm15o69RwBkXM0J6A5OlE67RZTfzUxTj8fB3dfcsc=
github.com/ulikunitz/xz v0.5.12/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14=
github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f h1:jQa4QT2UP9WYv2nzyawpKMOCl+Z/jW7djv2/J50lj9E=
github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f/go.mod h1:p9UJB6dDgdPgMJZs7UjUOdulKyRr9fqkS+6JKAInPy8=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down
3 changes: 3 additions & 0 deletions ouroboros.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/i5heu/ouroboros-db/internal/keyValStore"
"github.com/i5heu/ouroboros-db/pkg/index"
"github.com/i5heu/ouroboros-db/pkg/storage"
"github.com/i5heu/ouroboros-db/pkg/workerPool"

"github.com/sirupsen/logrus"

Expand All @@ -38,6 +39,7 @@ type OuroborosDB struct {
Index *index.Index
log *logrus.Logger
config Config
wp *workerPool.WorkerPool
}

type Config struct {
Expand Down Expand Up @@ -94,6 +96,7 @@ func NewOuroborosDB(conf Config) (*OuroborosDB, error) {
Index: index,
config: conf,
log: conf.Logger,
wp: workerPool.NewWorkerPool(workerPool.Config{}),
}

go ou.createGarbageCollection()
Expand Down
159 changes: 30 additions & 129 deletions pkg/buzhashChunker/buzhashChunker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,148 +2,49 @@ package buzhashChunker

import (
"bytes"
"crypto/sha512"
"fmt"
"io"
"runtime"
"sync"

"github.com/i5heu/ouroboros-db/pkg/types"
chunker "github.com/ipfs/boxo/chunker"
)

func ChunkBytes(data []byte) (types.ChunkCollection, types.ChunkMetaCollection, error) {
reader := bytes.NewReader(data)
return ChunkReader(reader)
type ChunkInformation struct {
ChunkNumber int
Data []byte
}

func ChunkReader(reader io.Reader) (types.ChunkCollection, types.ChunkMetaCollection, error) {
bz := chunker.NewBuzhash(reader)

// get thread count
numberOfCPUs := runtime.NumCPU()
numberOfWorkers := (numberOfCPUs * 5) - 1

hashChan := make(chan chunkInformation, numberOfWorkers+1)
workerLimit := make(chan struct{}, numberOfWorkers)
var wg sync.WaitGroup
var collectorWg sync.WaitGroup

// spawn collector
resultChan := make(chan chunkResult, 1)
collectorWg.Add(1)
go collectChunkData(&collectorWg, hashChan, resultChan)

// Read and process chunks
for chunkIndex := 0; ; chunkIndex++ {
chunk, err := bz.NextBytes()
if err == io.EOF {
wg.Wait()
close(hashChan)
break
}
if err != nil {
return nil, nil, fmt.Errorf("error reading chunk: %w", err)
}

wg.Add(1)
workerLimit <- struct{}{}
go calculateSha512(&wg, hashChan, chunk, chunkIndex, workerLimit)
}

collectorWg.Wait()
close(resultChan)

finalResult, ok := <-resultChan
if !ok {
return nil, nil, fmt.Errorf("failed to read from result channel")
}

return finalResult.chunks, finalResult.meta, nil
}

func ChunkBytesSynchronously(data []byte) (types.ChunkCollection, types.ChunkMetaCollection, error) {
func ChunkBytes(data []byte) (chan ChunkInformation, chan error) {
reader := bytes.NewReader(data)
return ChunkReaderSynchronously(reader)
return ChunkReader(reader)
}

func ChunkReaderSynchronously(reader io.Reader) (types.ChunkCollection, types.ChunkMetaCollection, error) {
func ChunkReader(reader io.Reader) (chan ChunkInformation, chan error) {
resultChan := make(chan ChunkInformation, 20)
errorChan := make(chan error)
bz := chunker.NewBuzhash(reader)

var chunks types.ChunkCollection
var meta types.ChunkMetaCollection

for chunkIndex := 0; ; chunkIndex++ {
chunk, err := bz.NextBytes()
if err == io.EOF {
break
go func() {
defer close(resultChan)
defer close(errorChan)

for chunkIndex := 0; ; chunkIndex++ {
chunk, err := bz.NextBytes()
if err == io.EOF {
break
}
if err != nil {
errorChan <- fmt.Errorf("error reading chunk: %w", err)
return
}

chunkData := ChunkInformation{
ChunkNumber: chunkIndex,
Data: chunk,
}

resultChan <- chunkData
}
if err != nil {
return nil, nil, fmt.Errorf("error reading chunk: %w", err)
}

hash := sha512.Sum512(chunk)
chunkData := types.Chunk{
ChunkMeta: types.ChunkMeta{
Hash: hash,
DataLength: uint32(len(chunk)),
},
Data: chunk,
}
chunks = append(chunks, chunkData)
meta = append(meta, chunkData.ChunkMeta)
}

return chunks, meta, nil
}

type chunkInformation struct {
chunkNumber int
hash types.Hash
data []byte
}

type chunkResult struct {
chunks types.ChunkCollection
meta types.ChunkMetaCollection
}

func collectChunkData(collectorWg *sync.WaitGroup, chunkChan <-chan chunkInformation, resultChan chan<- chunkResult) {
defer collectorWg.Done()

chunkMap := map[int]types.Chunk{}

for hashInfo := range chunkChan {
chunkMap[hashInfo.chunkNumber] = types.Chunk{
ChunkMeta: types.ChunkMeta{
Hash: hashInfo.hash,
DataLength: uint32(len(hashInfo.data)),
},
Data: hashInfo.data,
}
}

chunks := make(types.ChunkCollection, len(chunkMap))
meta := make(types.ChunkMetaCollection, len(chunkMap))
for i := 0; i < len(chunkMap); i++ {
chunks[i] = chunkMap[i]
meta[i] = chunkMap[i].ChunkMeta
}

resultChan <- chunkResult{
chunks: chunks,
meta: meta,
}
}

func calculateSha512(wg *sync.WaitGroup, hashChan chan<- chunkInformation, data []byte, chunkNumber int, workerLimit chan struct{}) {
defer wg.Done()
}()

hash := sha512.Sum512(data)
hashChan <- chunkInformation{
chunkNumber: chunkNumber,
hash: hash,
data: data,
}
<-workerLimit
return resultChan, errorChan
}
51 changes: 11 additions & 40 deletions pkg/buzhashChunker/buzhashChunker_test.go
Original file line number Diff line number Diff line change
@@ -1,56 +1,27 @@
package buzhashChunker

import (
"crypto/sha512"
"testing"

"github.com/i5heu/ouroboros-db/pkg/types"
)

func TestChunkBytes(t *testing.T) {
// Initialize variables for testing
var input []byte = []byte("Hello World")
expectedHash := sha512.Sum512([]byte("Hello World"))
var expectedChunk types.Chunk = types.Chunk{
ChunkMeta: types.ChunkMeta{
Hash: expectedHash,
DataLength: uint32(len(input)),
},
Data: input,
}
var expectedChunks types.ChunkCollection = types.ChunkCollection{expectedChunk}
var expectedMeta types.ChunkMetaCollection = types.ChunkMetaCollection{expectedChunk.ChunkMeta}

// Call the function with the test input
chunks, meta, err := ChunkBytes(input)
if err != nil {
t.Errorf("ChunkBytes(%s) returned an error: %v", input, err)
expectedChunk := ChunkInformation{
ChunkNumber: 0,
Data: []byte("Hello World"),
}

// Check the function's output against the expected output
if len(chunks) != len(expectedChunks) {
t.Errorf("ChunkBytes(%s) returned %d chunks, expected %d", input, len(chunks), len(expectedChunks))
}

if len(meta) != len(expectedMeta) {
t.Errorf("ChunkBytes(%s) returned %d metadata chunks, expected %d", input, len(meta), len(expectedMeta))
}
// Call the function
resultChan, _ := ChunkBytes(input)
result := <-resultChan

for i, chunk := range chunks {
if chunk.Hash != expectedChunks[i].Hash {
t.Errorf("ChunkBytes(%s)[%d].Hash = %x, expected %x", input, i, chunk.Hash, expectedChunks[i].Hash)
}
if string(chunk.Data) != string(expectedChunks[i].Data) {
t.Errorf("ChunkBytes(%s)[%d].Data = %s, expected %s", input, i, chunk.Data, expectedChunks[i].Data)
}
// Check the result
if result.ChunkNumber != expectedChunk.ChunkNumber {
t.Errorf("Expected chunk number %d, got %d", expectedChunk.ChunkNumber, result.ChunkNumber)
}

for i, metaChunk := range meta {
if metaChunk.Hash != expectedMeta[i].Hash {
t.Errorf("ChunkBytes(%s) Metadata[%d].Hash = %x, expected %x", input, i, metaChunk.Hash, expectedMeta[i].Hash)
}
if metaChunk.DataLength != expectedMeta[i].DataLength {
t.Errorf("ChunkBytes(%s) Metadata[%d].DataLength = %d, expected %d", input, i, metaChunk.DataLength, expectedMeta[i].DataLength)
}
if string(result.Data) != string(expectedChunk.Data) {
t.Errorf("Expected data %s, got %s", string(expectedChunk.Data), string(result.Data))
}
}
25 changes: 25 additions & 0 deletions pkg/messanger/messanger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package messanger

import (
"crypto"

"github.com/i5heu/ouroboros-db/pkg/types"
)

type Identity struct {
Hash types.Hash
PublicKey *crypto.PublicKey
RootSignature1 []byte // ed25519
RootSignature2 []byte // dilithium/mode5 - post quantum
}

type RootCertificate struct {
RootPublicKey *crypto.PublicKey
}

type selfIdentity struct {
privatekey *crypto.PrivateKey
}

type Message struct {
}
12 changes: 12 additions & 0 deletions pkg/messanger/messangerService.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package messanger

type MessangerService interface {
Init() error
SendMessage(message string) error
SendMessageToPeer(peer string, message string) error
DistributeMessage(message string, numberOfSuccessors int) error
GetAllPeers() ([]string, error)
MessageHandler(handler func(message string) error)
AddPeer(peer string) error
Close()
}
6 changes: 4 additions & 2 deletions pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ import (
"sync"

"github.com/i5heu/ouroboros-db/internal/keyValStore"
"github.com/i5heu/ouroboros-db/pkg/buzhashChunker"
"github.com/i5heu/ouroboros-db/pkg/types"
"github.com/i5heu/ouroboros-db/pkg/workerPool"
)

type Storage struct {
kv *keyValStore.KeyValStore
wp *workerPool.WorkerPool
}

type StoreFileOptions struct {
Expand All @@ -21,6 +22,7 @@ type StoreFileOptions struct {
File []byte
Temporary types.Binary
FullTextSearch types.Binary
WorkerPool *workerPool.WorkerPool
}

func NewStorage(kv *keyValStore.KeyValStore) StorageService {
Expand Down Expand Up @@ -158,7 +160,7 @@ func (s *Storage) storeDataInChunkStore(data []byte) (types.ChunkMetaCollection,
return nil, fmt.Errorf("Error storing data: Data is empty")
}

chunks, _, err := buzhashChunker.ChunkBytes(data)
chunks, _, err := s.StoreDataPipeline(data)
if err != nil {
log.Fatalf("Error chunking data: %v", err)
return nil, err
Expand Down
Loading

0 comments on commit 5981740

Please sign in to comment.