Skip to content

Commit

Permalink
Merge pull request #8433 from youngoli/beam7087-3
Browse files Browse the repository at this point in the history
[BEAM-7154] Updating Go SDK errors (Part 1)
  • Loading branch information
aaltay committed May 1, 2019
2 parents fdf84ab + 62741ed commit 70cf8c6
Show file tree
Hide file tree
Showing 14 changed files with 83 additions and 78 deletions.
30 changes: 15 additions & 15 deletions sdks/go/pkg/beam/artifact/gcsproxy/retrieval.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
package gcsproxy

import (
"fmt"
"io"

"cloud.google.com/go/storage"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
pb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
"github.com/apache/beam/sdks/go/pkg/beam/util/gcsx"
"github.com/golang/protobuf/proto"
Expand All @@ -38,20 +38,20 @@ type RetrievalServer struct {
func ReadProxyManifest(ctx context.Context, object string) (*pb.ProxyManifest, error) {
bucket, obj, err := gcsx.ParseObject(object)
if err != nil {
return nil, fmt.Errorf("invalid manifest object %v: %v", object, err)
return nil, errors.Wrapf(err, "invalid manifest object %v", object)
}

cl, err := gcsx.NewClient(ctx, storage.ScopeReadOnly)
if err != nil {
return nil, fmt.Errorf("failed to create GCS client: %v", err)
return nil, errors.Wrap(err, "failed to create GCS client")
}
content, err := gcsx.ReadObject(ctx, cl, bucket, obj)
if err != nil {
return nil, fmt.Errorf("failed to read manifest %v: %v", object, err)
return nil, errors.Wrapf(err, "failed to read manifest %v", object)
}
var md pb.ProxyManifest
if err := proto.Unmarshal(content, &md); err != nil {
return nil, fmt.Errorf("invalid manifest %v: %v", object, err)
return nil, errors.Wrapf(err, "invalid manifest %v", object)
}
return &md, nil
}
Expand All @@ -66,7 +66,7 @@ func NewRetrievalServer(md *pb.ProxyManifest) (*RetrievalServer, error) {
blobs := make(map[string]string)
for _, l := range md.GetLocation() {
if _, _, err := gcsx.ParseObject(l.GetUri()); err != nil {
return nil, fmt.Errorf("location %v is not a GCS object: %v", l.GetUri(), err)
return nil, errors.Wrapf(err, "location %v is not a GCS object", l.GetUri())
}
blobs[l.GetName()] = l.GetUri()
}
Expand All @@ -83,21 +83,21 @@ func (s *RetrievalServer) GetArtifact(req *pb.GetArtifactRequest, stream pb.Arti
key := req.GetName()
blob, ok := s.blobs[key]
if !ok {
return fmt.Errorf("artifact %v not found", key)
return errors.Errorf("artifact %v not found", key)
}

bucket, object := parseObject(blob)

ctx := stream.Context()
client, err := gcsx.NewClient(ctx, storage.ScopeReadOnly)
if err != nil {
return fmt.Errorf("Failed to create client for %v: %v", key, err)
return errors.Wrapf(err, "Failed to create client for %v", key)
}

// Stream artifact in up to 1MB chunks.
r, err := client.Bucket(bucket).Object(object).NewReader(ctx)
if err != nil {
return fmt.Errorf("Failed to read object for %v: %v", key, err)
return errors.Wrapf(err, "Failed to read object for %v", key)
}
defer r.Close()

Expand All @@ -106,14 +106,14 @@ func (s *RetrievalServer) GetArtifact(req *pb.GetArtifactRequest, stream pb.Arti
n, err := r.Read(data)
if n > 0 {
if err := stream.Send(&pb.ArtifactChunk{Data: data[:n]}); err != nil {
return fmt.Errorf("chunk send failed: %v", err)
return errors.Wrap(err, "chunk send failed")
}
}
if err == io.EOF {
break
}
if err != nil {
return fmt.Errorf("failed to read from %v: %v", blob, err)
return errors.Wrapf(err, "failed to read from %v", blob)
}
}
return nil
Expand All @@ -123,24 +123,24 @@ func validate(md *pb.ProxyManifest) error {
keys := make(map[string]bool)
for _, a := range md.GetManifest().GetArtifact() {
if _, seen := keys[a.Name]; seen {
return fmt.Errorf("multiple artifact with name %v", a.Name)
return errors.Errorf("multiple artifact with name %v", a.Name)
}
keys[a.Name] = true
}
for _, l := range md.GetLocation() {
fresh, seen := keys[l.Name]
if !seen {
return fmt.Errorf("no artifact named %v for location %v", l.Name, l.Uri)
return errors.Errorf("no artifact named %v for location %v", l.Name, l.Uri)
}
if !fresh {
return fmt.Errorf("multiple locations for %v:%v", l.Name, l.Uri)
return errors.Errorf("multiple locations for %v:%v", l.Name, l.Uri)
}
keys[l.Name] = false
}

for key, fresh := range keys {
if fresh {
return fmt.Errorf("no location for %v", key)
return errors.Errorf("no location for %v", key)
}
}
return nil
Expand Down
25 changes: 12 additions & 13 deletions sdks/go/pkg/beam/artifact/gcsproxy/staging.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@ import (
"bytes"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"hash"
"path"
"sync"

"cloud.google.com/go/storage"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
pb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
"github.com/apache/beam/sdks/go/pkg/beam/util/gcsx"
"github.com/golang/protobuf/proto"
Expand All @@ -52,7 +51,7 @@ type staged struct {
func NewStagingServer(manifest string) (*StagingServer, error) {
bucket, object, err := gcsx.ParseObject(manifest)
if err != nil {
return nil, fmt.Errorf("invalid manifest location: %v", err)
return nil, errors.Wrap(err, "invalid manifest location")
}
root := path.Join(path.Dir(object), "blobs")

Expand All @@ -78,15 +77,15 @@ func (s *StagingServer) CommitManifest(ctx context.Context, req *pb.CommitManife

data, err := proto.Marshal(&pb.ProxyManifest{Manifest: manifest, Location: loc})
if err != nil {
return nil, fmt.Errorf("failed to marshal proxy manifest: %v", err)
return nil, errors.Wrap(err, "failed to marshal proxy manifest")
}

cl, err := gcsx.NewClient(ctx, storage.ScopeReadWrite)
if err != nil {
return nil, fmt.Errorf("failed to create GCS client: %v", err)
return nil, errors.Wrap(err, "failed to create GCS client")
}
if err := gcsx.WriteObject(ctx, cl, s.bucket, s.manifest, bytes.NewReader(data)); err != nil {
return nil, fmt.Errorf("failed to write manifest: %v", err)
return nil, errors.Wrap(err, "failed to write manifest")
}

// Commit returns the location of the manifest as the token, which can
Expand All @@ -104,13 +103,13 @@ func matchLocations(artifacts []*pb.ArtifactMetadata, blobs map[string]staged) (
for _, a := range artifacts {
info, ok := blobs[a.Name]
if !ok {
return nil, fmt.Errorf("artifact %v not staged", a.Name)
return nil, errors.Errorf("artifact %v not staged", a.Name)
}
if a.Sha256 == "" {
a.Sha256 = info.hash
}
if info.hash != a.Sha256 {
return nil, fmt.Errorf("staged artifact for %v has invalid SHA256: %v, want %v", a.Name, info.hash, a.Sha256)
return nil, errors.Errorf("staged artifact for %v has invalid SHA256: %v, want %v", a.Name, info.hash, a.Sha256)
}

loc = append(loc, &pb.ProxyManifest_Location{Name: a.Name, Uri: info.object})
Expand All @@ -124,11 +123,11 @@ func (s *StagingServer) PutArtifact(ps pb.ArtifactStagingService_PutArtifactServ

header, err := ps.Recv()
if err != nil {
return fmt.Errorf("failed to receive header: %v", err)
return errors.Wrap(err, "failed to receive header")
}
md := header.GetMetadata().GetMetadata()
if md == nil {
return fmt.Errorf("expected header as first message: %v", header)
return errors.Errorf("expected header as first message: %v", header)
}
object := path.Join(s.root, md.Name)

Expand All @@ -138,16 +137,16 @@ func (s *StagingServer) PutArtifact(ps pb.ArtifactStagingService_PutArtifactServ
ctx := ps.Context()
cl, err := gcsx.NewClient(ctx, storage.ScopeReadWrite)
if err != nil {
return fmt.Errorf("failed to create GCS client: %v", err)
return errors.Wrap(err, "failed to create GCS client")
}

r := &reader{sha256W: sha256.New(), stream: ps}
if err := gcsx.WriteObject(ctx, cl, s.bucket, object, r); err != nil {
return fmt.Errorf("failed to stage artifact %v: %v", md.Name, err)
return errors.Wrapf(err, "failed to stage artifact %v", md.Name)
}
hash := r.SHA256()
if md.Sha256 != "" && md.Sha256 != hash {
return fmt.Errorf("invalid SHA256 for artifact %v: %v want %v", md.Name, hash, md.Sha256)
return errors.Errorf("invalid SHA256 for artifact %v: %v want %v", md.Name, hash, md.Sha256)
}

s.mu.Lock()
Expand Down
18 changes: 9 additions & 9 deletions sdks/go/pkg/beam/artifact/materialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"io"
"math/rand"
"os"
Expand All @@ -30,6 +29,7 @@ import (
"sync"
"time"

"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
pb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
"github.com/apache/beam/sdks/go/pkg/beam/util/errorx"
"github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
Expand All @@ -50,7 +50,7 @@ func Materialize(ctx context.Context, endpoint string, rt string, dest string) (

m, err := client.GetManifest(ctx, &pb.GetManifestRequest{RetrievalToken: rt})
if err != nil {
return nil, fmt.Errorf("failed to get manifest: %v", err)
return nil, errors.Wrap(err, "failed to get manifest")
}
md := m.GetManifest().GetArtifact()
return md, MultiRetrieve(ctx, client, 10, md, rt, dest)
Expand Down Expand Up @@ -92,7 +92,7 @@ func MultiRetrieve(ctx context.Context, client pb.ArtifactRetrievalServiceClient
}
failures = append(failures, err.Error())
if len(failures) > attempts {
permErr.TrySetError(fmt.Errorf("failed to retrieve %v in %v attempts: %v", a.Name, attempts, strings.Join(failures, "; ")))
permErr.TrySetError(errors.Errorf("failed to retrieve %v in %v attempts: %v", a.Name, attempts, strings.Join(failures, "; ")))
break // give up
}
time.Sleep(time.Duration(rand.Intn(5)+1) * time.Second)
Expand All @@ -114,7 +114,7 @@ func Retrieve(ctx context.Context, client pb.ArtifactRetrievalServiceClient, a *

_, err := os.Stat(filename)
if err != nil && !os.IsNotExist(err) {
return fmt.Errorf("failed to stat %v: %v", filename, err)
return errors.Wrapf(err, "failed to stat %v", filename)
}
if err == nil {
// File already exists. Validate or delete.
Expand All @@ -130,7 +130,7 @@ func Retrieve(ctx context.Context, client pb.ArtifactRetrievalServiceClient, a *
}

if err2 := os.Remove(filename); err2 != nil {
return fmt.Errorf("failed to both validate %v and delete: %v (remove: %v)", filename, err, err2)
return errors.Errorf("failed to both validate %v and delete: %v (remove: %v)", filename, err, err2)
} // else: successfully deleted bad file.
} // else: file does not exist.

Expand Down Expand Up @@ -159,19 +159,19 @@ func retrieve(ctx context.Context, client pb.ArtifactRetrievalServiceClient, a *
sha256Hash, err := retrieveChunks(stream, w)
if err != nil {
fd.Close() // drop any buffered content
return fmt.Errorf("failed to retrieve chunk for %v: %v", filename, err)
return errors.Wrapf(err, "failed to retrieve chunk for %v", filename)
}
if err := w.Flush(); err != nil {
fd.Close()
return fmt.Errorf("failed to flush chunks for %v: %v", filename, err)
return errors.Wrapf(err, "failed to flush chunks for %v", filename)
}
if err := fd.Close(); err != nil {
return err
}

// Artifact Sha256 hash is an optional field in metadata so we should only validate when its present.
if a.Sha256 != "" && sha256Hash != a.Sha256 {
return fmt.Errorf("bad SHA256 for %v: %v, want %v", filename, sha256Hash, a.Sha256)
return errors.Errorf("bad SHA256 for %v: %v, want %v", filename, sha256Hash, a.Sha256)
}
return nil
}
Expand All @@ -191,7 +191,7 @@ func retrieveChunks(stream pb.ArtifactRetrievalService_GetArtifactClient, w io.W
panic(err) // cannot fail
}
if _, err := w.Write(chunk.Data); err != nil {
return "", fmt.Errorf("chunk write failed: %v", err)
return "", errors.Wrapf(err, "chunk write failed")
}
}
return hex.EncodeToString(sha256W.Sum(nil)), nil
Expand Down
26 changes: 13 additions & 13 deletions sdks/go/pkg/beam/artifact/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
package artifact

import (
"fmt"
"io"
"net"
"sync"
"testing"
"time"

"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
pb "github.com/apache/beam/sdks/go/pkg/beam/model/jobmanagement_v1"
"github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
"golang.org/x/net/context"
Expand Down Expand Up @@ -77,14 +77,14 @@ func (s *server) PutArtifact(ps pb.ArtifactStagingService_PutArtifactServer) err

header, err := ps.Recv()
if err != nil {
return fmt.Errorf("failed to receive header: %v", err)
return errors.Wrap(err, "failed to receive header")
}
if header.GetMetadata() == nil {
return fmt.Errorf("expected header as first message: %v", header)
return errors.Errorf("expected header as first message: %v", header)
}
key := header.GetMetadata().GetMetadata().Name
if header.GetMetadata().GetStagingSessionToken() == "" {
return fmt.Errorf("missing staging session token")
return errors.New("missing staging session token")
}
token := header.GetMetadata().GetStagingSessionToken()

Expand All @@ -101,10 +101,10 @@ func (s *server) PutArtifact(ps pb.ArtifactStagingService_PutArtifactServer) err
}

if msg.GetData() == nil {
return fmt.Errorf("expected data: %v", msg)
return errors.Errorf("expected data: %v", msg)
}
if len(msg.GetData().GetData()) == 0 {
return fmt.Errorf("expected non-empty data: %v", msg)
return errors.Errorf("expected non-empty data: %v", msg)
}
chunks = append(chunks, msg.GetData().GetData())
}
Expand All @@ -124,7 +124,7 @@ func (s *server) PutArtifact(ps pb.ArtifactStagingService_PutArtifactServer) err
func (s *server) CommitManifest(ctx context.Context, req *pb.CommitManifestRequest) (*pb.CommitManifestResponse, error) {
token := req.GetStagingSessionToken()
if token == "" {
return nil, fmt.Errorf("missing staging session token")
return nil, errors.New("missing staging session token")
}

m := s.getManifest(token, true)
Expand All @@ -136,7 +136,7 @@ func (s *server) CommitManifest(ctx context.Context, req *pb.CommitManifestReque
artifacts := req.GetManifest().GetArtifact()
for _, md := range artifacts {
if _, ok := m.m[md.Name]; !ok {
return nil, fmt.Errorf("artifact %v not staged", md.Name)
return nil, errors.Errorf("artifact %v not staged", md.Name)
}
}

Expand All @@ -153,12 +153,12 @@ func (s *server) CommitManifest(ctx context.Context, req *pb.CommitManifestReque
func (s *server) GetManifest(ctx context.Context, req *pb.GetManifestRequest) (*pb.GetManifestResponse, error) {
token := req.GetRetrievalToken()
if token == "" {
return nil, fmt.Errorf("missing retrieval token")
return nil, errors.New("missing retrieval token")
}

m := s.getManifest(token, false)
if m == nil || m.md == nil {
return nil, fmt.Errorf("manifest for %v not found", token)
return nil, errors.Errorf("manifest for %v not found", token)
}
m.mu.Lock()
defer m.mu.Unlock()
Expand All @@ -169,12 +169,12 @@ func (s *server) GetManifest(ctx context.Context, req *pb.GetManifestRequest) (*
func (s *server) GetArtifact(req *pb.GetArtifactRequest, stream pb.ArtifactRetrievalService_GetArtifactServer) error {
token := req.GetRetrievalToken()
if token == "" {
return fmt.Errorf("missing retrieval token")
return errors.New("missing retrieval token")
}

m := s.getManifest(token, false)
if m == nil || m.md == nil {
return fmt.Errorf("manifest for %v not found", token)
return errors.Errorf("manifest for %v not found", token)
}

// Validate artifact and grab chunks so that we can stream them without
Expand All @@ -184,7 +184,7 @@ func (s *server) GetArtifact(req *pb.GetArtifactRequest, stream pb.ArtifactRetri
elm, ok := m.m[req.GetName()]
if !ok || elm.md == nil {
m.mu.Unlock()
return fmt.Errorf("manifest for %v does not contain artifact %v", token, req.GetName())
return errors.Errorf("manifest for %v does not contain artifact %v", token, req.GetName())
}
chunks := elm.chunks
m.mu.Unlock()
Expand Down

0 comments on commit 70cf8c6

Please sign in to comment.