/
feed_asset.go
139 lines (117 loc) · 3.86 KB
/
feed_asset.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package feed
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/SafetyCulture/safetyculture-exporter/pkg/httpapi"
"github.com/SafetyCulture/safetyculture-exporter/pkg/internal/events"
"github.com/SafetyCulture/safetyculture-exporter/pkg/internal/util"
"github.com/SafetyCulture/safetyculture-exporter/pkg/logger"
)
// Asset represents a row from the assets feed
type Asset struct {
ID string `json:"id" csv:"asset_id" gorm:"primarykey;column:asset_id;size:36"`
Code string `json:"code" csv:"code"`
TypeID string `json:"type_id" csv:"type_id"`
TypeName string `json:"type_name" csv:"type_name"`
Fields string `json:"fields" csv:"fields"`
CreatedAt time.Time `json:"created_at" csv:"created_at"`
ModifiedAt time.Time `json:"modified_at" csv:"modified_at" gorm:"index:idx_ast_modified_at,sort:desc"`
SiteID string `json:"site_id" csv:"site_id" gorm:"size:41"`
State string `json:"state" csv:"state"`
}
// AssetFeed is a representation of the assets feed
type AssetFeed struct {
Limit int
Incremental bool
}
// Name is the name of the feed
func (f *AssetFeed) Name() string {
return "assets"
}
// HasRemainingInformation returns true if the feed returns remaining items information
func (f *AssetFeed) HasRemainingInformation() bool {
return true
}
// Model returns the model of the feed row
func (f *AssetFeed) Model() interface{} {
return Asset{}
}
// RowsModel returns the model of feed rows
func (f *AssetFeed) RowsModel() interface{} {
return &[]*Asset{}
}
// PrimaryKey returns the primary key(s)
func (f *AssetFeed) PrimaryKey() []string {
return []string{"asset_id"}
}
// Columns returns the columns of the row
func (f *AssetFeed) Columns() []string {
return []string{
"code",
"type_id",
"type_name",
"fields",
"created_at",
"modified_at",
"site_id",
"state",
}
}
// Order returns the ordering when retrieving an export
func (f *AssetFeed) Order() string {
return "asset_id"
}
// CreateSchema creates the schema of the feed for the supplied exporter
func (f *AssetFeed) CreateSchema(exporter Exporter) error {
return exporter.CreateSchema(f, &[]*Asset{})
}
// Export exports the feed to the supplied exporter
func (f *AssetFeed) Export(ctx context.Context, apiClient *httpapi.Client, exporter Exporter, orgID string) error {
l := logger.GetLogger().With("feed", f.Name(), "org_id", orgID)
status := GetExporterStatus()
if err := exporter.InitFeed(f, &InitFeedOptions{
// Delete data if incremental refresh is disabled so there is no duplicates
Truncate: !f.Incremental,
}); err != nil {
return fmt.Errorf("init feed: %w", err)
}
drainFn := func(resp *GetFeedResponse) error {
var rows []*Asset
if err := json.Unmarshal(resp.Data, &rows); err != nil {
return fmt.Errorf("map data: %w", err)
}
if len(rows) != 0 {
// Calculate the size of the batch we can insert into the DB at once.
// Column count + buffer to account for primary keys
batchSize := exporter.ParameterLimit() / (len(f.Columns()) + 4)
err := util.SplitSliceInBatch(batchSize, rows, func(batch []*Asset) error {
if err := exporter.WriteRows(f, batch); err != nil {
return events.WrapEventError(err, "write rows")
}
return nil
})
if err != nil {
return err
}
}
status.UpdateStatus(f.Name(), resp.Metadata.RemainingRecords, exporter.GetDuration().Milliseconds())
l.With(
"estimated_remaining", resp.Metadata.RemainingRecords,
"duration_ms", apiClient.Duration.Milliseconds(),
"export_duration_ms", exporter.GetDuration().Milliseconds(),
).Info("export batch complete")
return nil
}
req := &GetFeedRequest{
InitialURL: "/feed/assets",
Params: GetFeedParams{
Limit: f.Limit,
},
}
if err := DrainFeed(ctx, apiClient, req, drainFn); err != nil {
return fmt.Errorf("assets feed %q: %w", f.Name(), err)
}
return exporter.FinaliseExport(f, &[]*Asset{})
}