-
Notifications
You must be signed in to change notification settings - Fork 1
/
full_sync.go
132 lines (112 loc) · 4.44 KB
/
full_sync.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package c1api
import (
"context"
"errors"
"io"
"os"
"github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
v1 "github.com/conductorone/baton-sdk/pb/c1/connectorapi/baton/v1"
"github.com/conductorone/baton-sdk/pkg/annotations"
sdkSync "github.com/conductorone/baton-sdk/pkg/sync"
"github.com/conductorone/baton-sdk/pkg/tasks"
"github.com/conductorone/baton-sdk/pkg/types"
)
type fullSyncHelpers interface {
ConnectorClient() types.ConnectorClient
Upload(ctx context.Context, r io.ReadSeeker) error
FinishTask(ctx context.Context, resp proto.Message, annos annotations.Annotations, err error) error
HeartbeatTask(ctx context.Context, annos annotations.Annotations) (context.Context, error)
TempDir() string
}
type fullSyncTaskHandler struct {
task *v1.Task
helpers fullSyncHelpers
}
func (c *fullSyncTaskHandler) sync(ctx context.Context, c1zPath string) error {
l := ctxzap.Extract(ctx).With(zap.String("task_id", c.task.GetId()), zap.Stringer("task_type", tasks.GetType(c.task)))
syncer, err := sdkSync.NewSyncer(ctx, c.helpers.ConnectorClient(), sdkSync.WithC1ZPath(c1zPath), sdkSync.WithTmpDir(c.helpers.TempDir()))
if err != nil {
l.Error("failed to create syncer", zap.Error(err))
return err
}
// TODO(jirwin): Should we attempt to retry at all before failing the task?
err = syncer.Sync(ctx)
if err != nil {
l.Error("failed to sync", zap.Error(err))
// We don't defer syncer.Close() in order to capture the error without named return values.
if closeErr := syncer.Close(ctx); closeErr != nil {
l.Error("failed to close syncer after sync error", zap.Error(err))
err = errors.Join(err, closeErr)
}
return err
}
if err := syncer.Close(ctx); err != nil {
l.Error("failed to close syncer", zap.Error(err))
return err
}
return nil
}
// TODO(morgabra) We should handle task resumption here. The task should contain at least an active sync id so we can
// resume syncing if we get restarted or fail to heartbeat temporarily.
// TODO(morgabra) Ideally we can tell the difference between a task cancellation and a task failure via the result
// of HeartbeatTask(). If we get cancelled, we probably want to clean up our sync state. If we fail to heartbeat, we
// might want to keep our sync state around so we can resume the task.
// TODO(morgabra) If we have a task with no sync_id set, we should create one and set it via heartbeat annotations? If we have a
// task with a sync_id and it doesn't match our current state sync_id, we should reject the task. If we have a task
// with a sync_id that does match our current state, we should resume our current sync, if possible.
func (c *fullSyncTaskHandler) HandleTask(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
l := ctxzap.Extract(ctx).With(zap.String("task_id", c.task.GetId()), zap.Stringer("task_type", tasks.GetType(c.task)))
l.Info("Handling full sync task.")
assetFile, err := os.CreateTemp(c.helpers.TempDir(), "baton-sdk-sync-upload")
if err != nil {
l.Error("failed to create temp file", zap.Error(err))
return c.helpers.FinishTask(ctx, nil, nil, err)
}
c1zPath := assetFile.Name()
err = assetFile.Close()
if err != nil {
return c.helpers.FinishTask(ctx, nil, nil, err)
}
// TODO(morgabra) Add annotation for for sync_id, or come up with some other way to track sync state.
ctx, err = c.helpers.HeartbeatTask(ctx, nil)
if err != nil {
l.Error("failed to heartbeat task", zap.Error(err))
return err
}
err = c.sync(ctx, c1zPath)
if err != nil {
l.Error("failed to sync", zap.Error(err))
return c.helpers.FinishTask(ctx, nil, nil, err)
}
c1zF, err := os.Open(c1zPath)
if err != nil {
l.Error("failed to open sync asset prior to upload", zap.Error(err))
return c.helpers.FinishTask(ctx, nil, nil, err)
}
defer func(f *os.File) {
err = f.Close()
if err != nil {
l.Error("failed to close sync asset", zap.Error(err), zap.String("path", f.Name()))
}
err = os.Remove(f.Name())
if err != nil {
l.Error("failed to remove temp file", zap.Error(err), zap.String("path", f.Name()))
}
}(c1zF)
err = c.helpers.Upload(ctx, c1zF)
if err != nil {
l.Error("failed to upload sync asset", zap.Error(err))
return c.helpers.FinishTask(ctx, nil, nil, err)
}
return c.helpers.FinishTask(ctx, nil, nil, nil)
}
func newFullSyncTaskHandler(task *v1.Task, helpers fullSyncHelpers) tasks.TaskHandler {
return &fullSyncTaskHandler{
task: task,
helpers: helpers,
}
}