Skip to content

Commit

Permalink
generate bucket car bundler
Browse files Browse the repository at this point in the history
  • Loading branch information
alvin-reyes committed May 25, 2023
1 parent 7913f52 commit 0800483
Show file tree
Hide file tree
Showing 13 changed files with 85 additions and 51 deletions.
6 changes: 3 additions & 3 deletions api/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type UploadResponse struct {
}

func ConfigurePinningRouter(e *echo.Group, node *core.LightNode) {
var DeltaUploadApi = node.Config.ExternalApi.ApiUrl
var DeltaUploadApi = node.Config.ExternalApi.DeltaNodeApiUrl
content := e.Group("/content")
content.POST("/add", handleUploadToCarBucketAndMiners(node, DeltaUploadApi))
content.POST("/add-ipfs", handleUploadFromCidAndMiners(node, DeltaUploadApi))
Expand Down Expand Up @@ -217,7 +217,7 @@ func handleUploadFromCidAndMiners(node *core.LightNode, DeltaUploadApi string) f

if makeDeal == "true" {
job := jobs.CreateNewDispatcher()
job.AddJob(jobs.NewBucketAggregator(node, newContent, addNode))
job.AddJob(jobs.NewBucketAggregator(node, newContent, addNode, false))
job.Start(1)
}

Expand Down Expand Up @@ -415,7 +415,7 @@ func handleUploadToCarBucketAndMiners(node *core.LightNode, DeltaUploadApi strin

if makeDeal == "true" {
job := jobs.CreateNewDispatcher()
job.AddJob(jobs.NewBucketAggregator(node, newContent, srcR))
job.AddJob(jobs.NewBucketAggregator(node, newContent, srcR, false))
job.Start(1)
}

Expand Down
5 changes: 3 additions & 2 deletions cmd/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func DaemonCmd(cfg *config.DeltaConfig) []*cli.Command {
`)
fmt.Println("Cleaning up and retrying...")
cleanUpAndRetry(ln)
//runProcessors(ln)
fmt.Println("Cleaning up and retrying... Done")

fmt.Println("Starting API server")
Expand All @@ -100,8 +101,8 @@ func DaemonCmd(cfg *config.DeltaConfig) []*cli.Command {
}

func runProcessors(ln *core.LightNode) {
dealCheckFreq := ln.Config.Common.DealCheck
dealCheckFreqTick := time.NewTicker(time.Duration(dealCheckFreq) * time.Second)
dealCheckFreq := 12
dealCheckFreqTick := time.NewTicker(time.Duration(dealCheckFreq) * time.Hour)

for {
select {
Expand Down
4 changes: 2 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ type DeltaConfig struct {
}

ExternalApi struct {
ApiUrl string `env:"DELTA_NODE_API" envDefault:"http://localhost:1414"`
AuthSvcUrl string `env:"AUTH_SVC_API" envDefault:"https://auth.estuary.tech"`
DeltaNodeApiUrl string `env:"DELTA_NODE_API" envDefault:"http://localhost:1414"`
AuthSvcUrl string `env:"AUTH_SVC_API" envDefault:"https://auth.estuary.tech"`
}
}

Expand Down
63 changes: 33 additions & 30 deletions core/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,38 +62,41 @@ type LogEvent struct {
}

type Bundle struct {
ID int64 `gorm:"primaryKey"`
Uuid string `gorm:"index" json:"uuid"`
Name string `json:"name"`
Size int64 `json:"size"`
DeltaContentId int64 `json:"delta_content_id"`
DeltaNodeUrl string `json:"delta_node_url"`
RequestingApiKey string `json:"requesting_api_key,omitempty"`
Miner string `json:"miner"`
Cid string `json:"cid"`
Status string `json:"status"` // open, processing, filled, uploaded-to-delta
LastMessage string `json:"last_message"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
ID int64 `gorm:"primaryKey"`
Uuid string `gorm:"index" json:"uuid"`
Name string `json:"name"`
Size int64 `json:"size"`
DeltaContentId int64 `json:"delta_content_id"`
DeltaNodeUrl string `json:"delta_node_url"`
RequestingApiKey string `json:"requesting_api_key,omitempty"`
Miner string `json:"miner"`
FileCid string `json:"file_cid"`
AggregatePieceCid string `json:"aggregate_piece_cid"`
InclusionProof string `json:"inclusion_proof"`
Status string `json:"status"` // open, processing, filled, uploaded-to-delta
LastMessage string `json:"last_message"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
type Bucket struct {
ID int64 `gorm:"primaryKey"`
Uuid string `gorm:"index" json:"uuid"`
BundleUuid string `gorm:"index" json:"bundle_uuid"`
Name string `json:"name"`
Size int64 `json:"size"`
RequestingApiKey string `json:"requesting_api_key,omitempty"`
DeltaContentId int64 `json:"delta_content_id"`
DeltaNodeUrl string `json:"delta_node_url"`
Miner string `json:"miner"`
PieceCid string `json:"piece_cid"`
PieceSize int64 `json:"piece_size"`
InclusionProof string `json:"inclusion_proof"`
Cid string `json:"cid"`
Status string `json:"status"` // open, processing, filled, bundled
LastMessage string `json:"last_message"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
ID int64 `gorm:"primaryKey" json:"id,omitempty"`
Uuid string `gorm:"index" json:"uuid" json:"uuid,omitempty"`
BundleUuid string `gorm:"index" json:"bundle_uuid" json:"bundle_uuid,omitempty"`
Name string `json:"name" json:"name,omitempty"`
Size int64 `json:"size" json:"size,omitempty"`
RequestingApiKey string `json:"requesting_api_key,omitempty" json:"requesting_api_key,omitempty"`
DeltaContentId int64 `json:"delta_content_id" json:"delta_content_id,omitempty"`
DeltaNodeUrl string `json:"delta_node_url" json:"delta_node_url,omitempty"`
Miner string `json:"miner" json:"miner,omitempty"`
PieceCid string `json:"piece_cid" json:"piece_cid,omitempty"`
PieceSize int64 `json:"piece_size" json:"piece_size,omitempty"`
CommPa string `json:"comm_pa,omitempty"`
SizePa int64 `json:"size_pa,omitempty"`
Cid string `json:"cid" json:"cid,omitempty"`
Status string `json:"status" json:"status,omitempty"` // open, processing, filled, bundled
LastMessage string `json:"last_message" json:"last_message,omitempty"`
CreatedAt time.Time `json:"created_at" json:"created_at"`
UpdatedAt time.Time `json:"updated_at" json:"updated_at"`
}

// main content record
Expand Down
6 changes: 4 additions & 2 deletions jobs/bucket_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ import (
)

type BucketAggregator struct {
Force bool `json:"force"`
Content core.Content `json:"content"`
File io.Reader `json:"file"`
Processor
}

func NewBucketAggregator(ln *core.LightNode, contentToProcess core.Content, fileNode io.Reader) IProcessor {
func NewBucketAggregator(ln *core.LightNode, contentToProcess core.Content, fileNode io.Reader, force bool) IProcessor {
return &BucketAggregator{
force,
contentToProcess,
fileNode,
Processor{
Expand Down Expand Up @@ -60,7 +62,7 @@ func (r *BucketAggregator) Run() error {
}
}

if totalSize > r.LightNode.Config.Common.AggregateSize && len(content) > 1 {
if r.Force || totalSize > r.LightNode.Config.Common.AggregateSize && len(content) > 1 {
bucket.Status = "processing"
r.LightNode.DB.Save(&bucket)

Expand Down
32 changes: 30 additions & 2 deletions jobs/bucket_car_bundler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func (b BucketCarBundler) Run() error {
intTotalSize += bucket.PieceSize

bucket.BundleUuid = bundle.Uuid
err = b.LightNode.DB.Save(&bucket).Error
if err != nil {
panic(err)
}
Expand All @@ -72,7 +73,7 @@ func (b BucketCarBundler) Run() error {

bundle.Status = "processing"
b.LightNode.DB.Save(&bundle)

//create the aggregate object
a, err := datasegment.NewAggregate(abi.PaddedPieceSize(totalSizePow2), subPieceInfos)
if err != nil {
Expand Down Expand Up @@ -118,10 +119,37 @@ func (b BucketCarBundler) Run() error {
panic(err)
}

bundle.Cid = rootBundle.Cid().String()
bundle.FileCid = rootBundle.Cid().String()
bundle.AggregatePieceCid = cidPC.String()
bundle.Status = "filled"
bundle.Size = int64(a.DealSize)
bundle.DeltaNodeUrl = b.LightNode.Config.ExternalApi.DeltaNodeApiUrl
bundle.CreatedAt = time.Now()
bundle.UpdatedAt = time.Now()
b.LightNode.DB.Save(&bundle)

// update the bucket with proof piece info
for _, bucketX := range buckets {
bucketPieceCid, err := cid.Decode(bucketX.PieceCid)
if err != nil {
panic(err)
}

pieceInfo := abi.PieceInfo{
Size: abi.PaddedPieceSize(bucketX.PieceSize),
PieceCID: bucketPieceCid,
}
proofForEach, err := a.ProofForPieceInfo(pieceInfo)
aux, err := proofForEach.ComputeExpectedAuxData(datasegment.VerifierDataForPieceInfo(pieceInfo))

bucketX.CommPa = aux.CommPa.String()
bucketX.SizePa = int64(aux.SizePa)
if err != nil {
panic(err)
}

}

job := CreateNewDispatcher()
job.AddJob(NewUploadBundleToDeltaProcessor(b.LightNode, rootReader, bundle))
job.Start(1)
Expand Down
2 changes: 1 addition & 1 deletion jobs/bucket_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func (r *BucketChecker) Info() error {
}

func NewBucketChecker(ln *core.LightNode, bucket core.Bucket) IProcessor {
DELTA_UPLOAD_API = ln.Config.ExternalApi.ApiUrl
DELTA_UPLOAD_API = ln.Config.ExternalApi.DeltaNodeApiUrl
return &BucketChecker{
Bucket: bucket,
Processor: Processor{
Expand Down
2 changes: 1 addition & 1 deletion jobs/bundle_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func (r *BundleChecker) Info() error {
}

func NewBundleChecker(ln *core.LightNode, bucket core.Bundle) IProcessor {
DELTA_UPLOAD_API = ln.Config.ExternalApi.ApiUrl
DELTA_UPLOAD_API = ln.Config.ExternalApi.DeltaNodeApiUrl
return &BundleChecker{
Bundle: bucket,
Processor: Processor{
Expand Down
2 changes: 1 addition & 1 deletion jobs/deal_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (r *DealChecker) Info() error {
}

func NewDealChecker(ln *core.LightNode) IProcessor {
DELTA_UPLOAD_API = ln.Config.ExternalApi.ApiUrl
DELTA_UPLOAD_API = ln.Config.ExternalApi.DeltaNodeApiUrl
return &DealChecker{
Processor{
LightNode: ln,
Expand Down
2 changes: 1 addition & 1 deletion jobs/deal_item_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (r *DealItemChecker) Info() error {
}

func NewDealItemChecker(ln *core.LightNode, content core.Content) IProcessor {
DELTA_UPLOAD_API = ln.Config.ExternalApi.ApiUrl
DELTA_UPLOAD_API = ln.Config.ExternalApi.DeltaNodeApiUrl
return &DealItemChecker{
Content: content,
Processor: Processor{
Expand Down
2 changes: 1 addition & 1 deletion jobs/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ type DealE2EUploadResponse struct {
}

func NewUploadToDeltaProcessor(ln *core.LightNode, contentToProcess core.Content, fileNode io.Reader) IProcessor {
DELTA_UPLOAD_API = ln.Config.ExternalApi.ApiUrl
DELTA_UPLOAD_API = ln.Config.ExternalApi.DeltaNodeApiUrl
REPLICATION_FACTOR = string(ln.Config.Common.ReplicationFactor)
return &UploadToDeltaProcessor{
contentToProcess,
Expand Down
8 changes: 4 additions & 4 deletions jobs/upload_bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type UploadBundleToDeltaProcessor struct {
}

func NewUploadBundleToDeltaProcessor(ln *core.LightNode, reader io.Reader, bundle core.Bundle) IProcessor {
DELTA_UPLOAD_API = ln.Config.ExternalApi.ApiUrl
DELTA_UPLOAD_API = ln.Config.ExternalApi.DeltaNodeApiUrl
REPLICATION_FACTOR = string(ln.Config.Common.ReplicationFactor)
return &UploadBundleToDeltaProcessor{

Expand All @@ -50,12 +50,12 @@ func (r *UploadBundleToDeltaProcessor) Run() error {
payload := &bytes.Buffer{}
writer := multipart.NewWriter(payload)

partFile, err := writer.CreateFormFile("data", r.Bundle.Cid)
partFile, err := writer.CreateFormFile("data", r.Bundle.FileCid)
if err != nil {
fmt.Println("CreateFormFile error: ", err)
return nil
}
cidToGet, err := cid.Decode(r.Bundle.Cid)
cidToGet, err := cid.Decode(r.Bundle.FileCid)
if err != nil {
fmt.Println("Error decoding cid: ", err)
return nil
Expand Down Expand Up @@ -120,7 +120,7 @@ func (r *UploadBundleToDeltaProcessor) Run() error {
if err != nil || res.StatusCode != http.StatusOK {
fmt.Println("Error uploading car to delta: ", err)
r.Bundle.Status = "error"
r.Bundle.LastMessage = err.Error()
r.Bundle.LastMessage = "Error uploading car to delta"
r.LightNode.DB.Save(&r.Bundle)
time.Sleep(retryInterval)
continue
Expand Down
2 changes: 1 addition & 1 deletion jobs/upload_car.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type UploadCarToDeltaProcessor struct {
}

func NewUploadCarToDeltaProcessor(ln *core.LightNode, bucket core.Bucket, rootCid string) IProcessor {
DELTA_UPLOAD_API = ln.Config.ExternalApi.ApiUrl
DELTA_UPLOAD_API = ln.Config.ExternalApi.DeltaNodeApiUrl
REPLICATION_FACTOR = string(ln.Config.Common.ReplicationFactor)
return &UploadCarToDeltaProcessor{
bucket,
Expand Down

0 comments on commit 0800483

Please sign in to comment.