/
feed_action_assignees.go
136 lines (111 loc) · 4.2 KB
/
feed_action_assignees.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package feed
import (
"context"
"encoding/json"
"time"
"github.com/SafetyCulture/iauditor-exporter/internal/app/api"
"github.com/SafetyCulture/iauditor-exporter/internal/app/util"
)
// ActionAssignee represents a row from the action_assignees feed
type ActionAssignee struct {
ID string `json:"id" csv:"id" gorm:"primarykey;size:375"`
ActionID string `json:"action_id" csv:"action_id" gorm:"index:idx_act_action_id;size:36"`
AssigneeID string `json:"assignee_id" csv:"assignee_id" gorm:"size:256"`
Type string `json:"type" csv:"type" gorm:"size:10"`
Name string `json:"name" csv:"name"`
OrganisationID string `json:"organisation_id" csv:"organisation_id" gorm:"index:idx_act_asg_modified_at;size:37"`
ModifiedAt time.Time `json:"modified_at" csv:"modified_at" gorm:"index:idx_act_asg_modified_at,sort:desc"`
ExportedAt time.Time `json:"exported_at" csv:"exported_at" gorm:"index:idx_act_asg_modified_at;autoUpdateTime"`
}
// ActionAssigneeFeed is a representation of the action_assignees feed
type ActionAssigneeFeed struct {
ModifiedAfter time.Time
Incremental bool
}
// Name is the name of the feed
func (f *ActionAssigneeFeed) Name() string {
return "action_assignees"
}
// Model returns the model of the feed row
func (f *ActionAssigneeFeed) Model() interface{} {
return ActionAssignee{}
}
// RowsModel returns the model of feed rows
func (f *ActionAssigneeFeed) RowsModel() interface{} {
return &[]*ActionAssignee{}
}
// PrimaryKey returns the primary key(s)
func (f *ActionAssigneeFeed) PrimaryKey() []string {
return []string{"id"}
}
// Columns returns the columns of the row
func (f *ActionAssigneeFeed) Columns() []string {
return []string{
"action_id",
"assignee_id",
"type",
"name",
"organisation_id",
"modified_at",
"exported_at",
}
}
// Order returns the ordering when retrieving an export
func (f *ActionAssigneeFeed) Order() string {
return "action_id, assignee_id"
}
// CreateSchema creates the schema of the feed for the supplied exporter
func (f *ActionAssigneeFeed) CreateSchema(exporter Exporter) error {
return exporter.CreateSchema(f, &[]*ActionAssignee{})
}
func (f *ActionAssigneeFeed) writeRows(ctx context.Context, exporter Exporter, rows []*ActionAssignee) error {
// Calculate the size of the batch we can insert into the DB at once. Column count + buffer to account for primary keys
batchSize := exporter.ParameterLimit() / (len(f.Columns()) + 4)
for i := 0; i < len(rows); i += batchSize {
j := i + batchSize
if j > len(rows) {
j = len(rows)
}
var actionIDs []string
for k := range rows[i:j] {
actionIDs = append(actionIDs, rows[k].ActionID)
}
// Delete the actions if already exist
err := exporter.DeleteRowsIfExist(f, "action_id IN ?", actionIDs)
util.Check(err, "Failed to delete rows in exporter")
err = exporter.WriteRows(f, rows[i:j])
util.Check(err, "Failed to write data to exporter")
}
return nil
}
// Export exports the feed to the supplied exporter
func (f *ActionAssigneeFeed) Export(ctx context.Context, apiClient *api.Client, exporter Exporter, orgID string) error {
logger := util.GetLogger()
feedName := f.Name()
exporter.InitFeed(f, &InitFeedOptions{
// Delete data if incremental refresh is disabled so there is no duplicates
Truncate: !f.Incremental,
})
var err error
f.ModifiedAfter, err = exporter.LastModifiedAt(f, f.ModifiedAfter, orgID)
util.Check(err, "unable to load modified after")
logger.Infof("%s: exporting for org_id: %s since: %s", feedName, orgID, f.ModifiedAfter.Format(time.RFC1123))
err = apiClient.DrainFeed(ctx, &api.GetFeedRequest{
InitialURL: "/feed/action_assignees",
Params: api.GetFeedParams{
ModifiedAfter: f.ModifiedAfter,
},
}, func(resp *api.GetFeedResponse) error {
rows := []*ActionAssignee{}
err := json.Unmarshal(resp.Data, &rows)
util.Check(err, "Failed to unmarshal action-assignees data to struct")
if len(rows) != 0 {
err = f.writeRows(ctx, exporter, rows)
util.Check(err, "Failed to write data to exporter")
}
logger.Infof("%s: %d remaining", feedName, resp.Metadata.RemainingRecords)
return nil
})
util.Check(err, "Failed to export feed")
return exporter.FinaliseExport(f, &[]*ActionAssignee{})
}