Skip to content

Commit

Permalink
Error handling improvement for storage and util packages (#444)
Browse files Browse the repository at this point in the history
* feat: error handling for storage and util

Signed-off-by: Jingfu Wang <jingfu.wang@coinbase.com>

* fix: make gen

Signed-off-by: Jingfu Wang <jingfu.wang@coinbase.com>

Signed-off-by: Jingfu Wang <jingfu.wang@coinbase.com>
  • Loading branch information
GeekArthur committed Sep 7, 2022
1 parent 1ffb630 commit 7712c7d
Show file tree
Hide file tree
Showing 21 changed files with 660 additions and 701 deletions.
6 changes: 5 additions & 1 deletion asserter/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,11 @@ func (a *Asserter) Transaction(
// invalid transaction identifiers, or a direction not defined by the enum.
func (a *Asserter) RelatedTransactions(relatedTransactions []*types.RelatedTransaction) error {
if dup := DuplicateRelatedTransaction(relatedTransactions); dup != nil {
return fmt.Errorf("%w: %v", ErrDuplicateRelatedTransaction, dup)
return fmt.Errorf(
"related transaction %s is invalid: %w",
types.PrintStruct(dup),
ErrDuplicateRelatedTransaction,
)
}

for i, relatedTransaction := range relatedTransactions {
Expand Down
2 changes: 1 addition & 1 deletion constructor/worker/populator.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func PopulateInput(state string, input string) (string, error) {
return value.Raw
})
if err != nil {
return "", fmt.Errorf("%w: unable to insert variables", err)
return "", fmt.Errorf("unable to insert variables: %w", err)
}

if !gjson.Valid(input) {
Expand Down
10 changes: 5 additions & 5 deletions constructor/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (w *Worker) invokeWorker(
case job.GetBlob:
return w.GetBlobWorker(ctx, dbTx, input)
default:
return "", fmt.Errorf("%w: %s", ErrInvalidActionType, action)
return "", ErrInvalidActionType
}
}

Expand Down Expand Up @@ -237,7 +237,7 @@ func GenerateKeyWorker(rawInput string) (string, error) {
var input job.GenerateKeyInput
err := job.UnmarshalInput([]byte(rawInput), &input)
if err != nil {
return "", fmt.Errorf("%w: %s", ErrInvalidInput, err.Error())
return "", fmt.Errorf("failed to unmarshal input: %w", err)
}

kp, err := keys.GenerateKeypair(input.CurveType)
Expand Down Expand Up @@ -303,7 +303,7 @@ func MathWorker(rawInput string) (string, error) {
var input job.MathInput
err := job.UnmarshalInput([]byte(rawInput), &input)
if err != nil {
return "", fmt.Errorf("%w: %s", ErrInvalidInput, err.Error())
return "", fmt.Errorf("failed to unmarshal input: %w", err)
}

var result string
Expand Down Expand Up @@ -880,10 +880,10 @@ func HTTPRequestWorker(rawInput string) (string, error) {

if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf(
"%w: status code %d with body %s",
ErrActionFailed,
"status code %d with body %s: %w",
resp.StatusCode,
body,
ErrActionFailed,
)
}

Expand Down
2 changes: 1 addition & 1 deletion constructor/worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1474,7 +1474,7 @@ func TestJob_Failures(t *testing.T) {
OutputPath: "key",
},
ProcessedInput: `{"curve_typ": "secp256k1"}`,
Err: ErrInvalidInput,
Err: fmt.Errorf("unknown field \"curve_typ\""),
},
helper: &mocks.Helper{},
},
Expand Down
4 changes: 2 additions & 2 deletions fetcher/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ func tryAgain(fetchMsg string, thisBackoff *Backoff, err *Error) *Error {
if nextBackoff == backoff.Stop {
return &Error{
Err: fmt.Errorf(
"%w: %s",
ErrExhaustedRetries,
"fetch message %s: %w",
fetchMsg,
ErrExhaustedRetries,
),
}
}
Expand Down
4 changes: 2 additions & 2 deletions parser/intent.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ func (p *Parser) ExpectedOperations(

if !foundMatch && errExtra {
return fmt.Errorf(
"%w: %s",
"operation %s: %w",
types.PrintStruct(obs),
ErrExpectedOperationsExtraOperation,
types.PrettyPrintStruct(obs),
)
}
}
Expand Down
73 changes: 47 additions & 26 deletions storage/database/badger_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,13 +262,13 @@ func NewBadgerDatabase(

db, err := badger.Open(b.badgerOptions)
if err != nil {
return nil, fmt.Errorf("%w: %v", storageErrs.ErrDatabaseOpenFailed, err)
return nil, fmt.Errorf("unable to open database: %w", err)
}
b.db = db

encoder, err := encoder.NewEncoder(b.compressorEntries, b.pool, b.compress)
if err != nil {
return nil, fmt.Errorf("%w: %v", storageErrs.ErrCompressorLoadFailed, err)
return nil, fmt.Errorf("unable to load compressor: %w", err)
}
b.encoder = encoder

Expand All @@ -286,7 +286,7 @@ func (b *BadgerDatabase) Close(ctx context.Context) error {
close(b.closed)

if err := b.db.Close(); err != nil {
return fmt.Errorf("%w: %v", storageErrs.ErrDBCloseFailed, err)
return fmt.Errorf("unable to close badger database: %w", err)
}

return nil
Expand Down Expand Up @@ -436,7 +436,7 @@ func (b *BadgerTransaction) Commit(context.Context) error {
b.releaseLocks()

if err != nil {
return fmt.Errorf("%w: %v", storageErrs.ErrCommitFailed, err)
return fmt.Errorf("unable to commit transaction: %w", err)
}

return nil
Expand Down Expand Up @@ -494,15 +494,19 @@ func (b *BadgerTransaction) Get(
if err == badger.ErrKeyNotFound {
return false, nil, nil
} else if err != nil {
return false, nil, err
return false, nil, fmt.Errorf("unable to get the item of key %s within a transaction: %w", string(key), err)
}

err = item.Value(func(v []byte) error {
_, err := value.Write(v)
return err
})
if err != nil {
return false, nil, err
return false, nil, fmt.Errorf(
"unable to get the value from the item for key %s: %w",
string(key),
err,
)
}

return true, value.Bytes(), nil
Expand Down Expand Up @@ -539,13 +543,17 @@ func (b *BadgerTransaction) Scan(
k := item.Key()
err := item.Value(func(v []byte) error {
if err := worker(k, v); err != nil {
return fmt.Errorf("%w: worker failed for key %s", err, string(k))
return fmt.Errorf("worker failed for key %s: %w", string(k), err)
}

return nil
})
if err != nil {
return -1, fmt.Errorf("%w: unable to get value for key %s", err, string(k))
return -1, fmt.Errorf(
"unable to get the value from the item for key %s: %w",
string(k),
err,
)
}

entries++
Expand All @@ -568,7 +576,12 @@ func decompressAndSave(
// encoded using dictionary compression.
decompressed, err := encoder.DecodeRaw(namespace, v)
if err != nil {
return -1, -1, fmt.Errorf("%w %s: %v", storageErrs.ErrDecompressFailed, string(k), err)
return -1, -1, fmt.Errorf(
"unable to decompress for namespace %s and input %s: %w",
namespace,
string(v),
err,
)
}

err = ioutil.WriteFile(
Expand All @@ -577,7 +590,11 @@ func decompressAndSave(
os.FileMode(utils.DefaultFilePermissions),
)
if err != nil {
return -1, -1, fmt.Errorf("%w: %v", storageErrs.ErrDecompressSaveUnsuccessful, err)
return -1, -1, fmt.Errorf(
"unable to write decompress file %s: %w",
path.Join(tmpDir, types.Hash(string(k))),
err,
)
}

return float64(len(decompressed)), float64(len(v)), nil
Expand All @@ -591,27 +608,26 @@ func decompressAndEncode(
decompressed, err := ioutil.ReadFile(path) // #nosec G304
if err != nil {
return -1, -1, -1, fmt.Errorf(
"%w for file %s: %v",
storageErrs.ErrLoadFileUnsuccessful,
"unable to read decompress file %s: %w",
path,
err,
)
}

normalCompress, err := encoder.EncodeRaw("", decompressed)
if err != nil {
return -1, -1, -1, fmt.Errorf("%w: %v", storageErrs.ErrCompressNormalFailed, err)
return -1, -1, -1, fmt.Errorf("unable to compress normal: %w", err)
}

dictCompress, err := encoder.EncodeRaw(namespace, decompressed)
if err != nil {
return -1, -1, -1, fmt.Errorf("%w: %v", storageErrs.ErrCompressWithDictFailed, err)
return -1, -1, -1, fmt.Errorf("unable to compress with dictionary: %w", err)
}

// Ensure dict works
decompressedDict, err := encoder.DecodeRaw(namespace, dictCompress)
if err != nil {
return -1, -1, -1, fmt.Errorf("%w: %v", storageErrs.ErrDecompressWithDictFailed, err)
return -1, -1, -1, fmt.Errorf("unable to decompress with dictionary: %w", err)
}

if types.Hash(decompressed) != types.Hash(decompressedDict) {
Expand Down Expand Up @@ -644,12 +660,17 @@ func recompress(
func(k []byte, v []byte) error {
decompressed, err := badgerDb.Encoder().DecodeRaw(namespace, v)
if err != nil {
return fmt.Errorf("%w %s: %v", storageErrs.ErrDecompressFailed, string(k), err)
return fmt.Errorf(
"unable to decompress for namespace %s and input %s: %w",
namespace,
string(v),
err,
)
}

newCompressed, err := newCompressor.EncodeRaw(namespace, decompressed)
if err != nil {
return fmt.Errorf("%w: %v", storageErrs.ErrCompressWithDictFailed, err)
return fmt.Errorf("unable to compress with dictionary: %w", err)
}
onDiskSize += float64(len(v))
newSize += float64(len(newCompressed))
Expand All @@ -660,7 +681,7 @@ func recompress(
false,
)
if err != nil {
return -1, -1, fmt.Errorf("%w: %v", storageErrs.ErrRecompressFailed, err)
return -1, -1, fmt.Errorf("unable to recompress: %w", err)
}

// Negative savings here means that the new dictionary
Expand Down Expand Up @@ -691,14 +712,14 @@ func BadgerTrain(
WithCompressorEntries(compressorEntries),
)
if err != nil {
return -1, -1, fmt.Errorf("%w: unable to load database", err)
return -1, -1, fmt.Errorf("unable to load database: %w", err)
}
defer badgerDb.Close(ctx)

// Create directory to store uncompressed files for training
tmpDir, err := utils.CreateTempDir()
if err != nil {
return -1, -1, fmt.Errorf("%w: %v", storageErrs.ErrCreateTempDirectoryFailed, err)
return -1, -1, fmt.Errorf("unable to create temporary directory: %w", err)
}
defer utils.RemoveTempDir(tmpDir)

Expand All @@ -724,7 +745,7 @@ func BadgerTrain(
v,
)
if err != nil {
return fmt.Errorf("%w: unable to decompress and save", err)
return fmt.Errorf("unable to decompress and save: %w", err)
}

totalUncompressedSize += decompressedSize
Expand Down Expand Up @@ -774,11 +795,11 @@ func BadgerTrain(
dictPath,
) // #nosec G204
if err := cmd.Start(); err != nil {
return -1, -1, fmt.Errorf("%w: %v", storageErrs.ErrInvokeZSTDFailed, err)
return -1, -1, fmt.Errorf("unable to start zstd: %w", err)
}

if err := cmd.Wait(); err != nil {
return -1, -1, fmt.Errorf("%w: %v", storageErrs.ErrTrainZSTDFailed, err)
return -1, -1, fmt.Errorf("unable to train zstd: %w", err)
}

encoder, err := encoder.NewEncoder([]*encoder.CompressorEntry{
Expand All @@ -788,7 +809,7 @@ func BadgerTrain(
},
}, encoder.NewBufferPool(), true)
if err != nil {
return -1, -1, fmt.Errorf("%w: %v", storageErrs.ErrCompressorLoadFailed, err)
return -1, -1, fmt.Errorf("unable to load compressor: %w", err)
}

sizeUncompressed := float64(0)
Expand All @@ -809,7 +830,7 @@ func BadgerTrain(
encoder,
)
if err != nil {
return fmt.Errorf("%w: unable to decompress and encode", err)
return fmt.Errorf("unable to decompress and encode: %w", err)
}

sizeUncompressed += decompressed
Expand All @@ -819,7 +840,7 @@ func BadgerTrain(
return nil
})
if err != nil {
return -1, -1, fmt.Errorf("%w: %v", storageErrs.ErrWalkFilesFailed, err)
return -1, -1, fmt.Errorf("unable to walk files: %w", err)
}

log.Printf(
Expand Down
Loading

0 comments on commit 7712c7d

Please sign in to comment.