Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 19 additions & 13 deletions backend/helpers/pluginhelper/api/api_extractor_stateful.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,9 @@ func (extractor *StatefulApiExtractor[InputType]) Execute() errors.Error {
if !db.HasTable(table) {
return nil
}

clauses := []dal.Clause{
dal.Select("id"),
dal.From(table),
dal.Where("params = ?", params),
dal.Orderby("id ASC"),
Expand All @@ -116,35 +118,41 @@ func (extractor *StatefulApiExtractor[InputType]) Execute() errors.Error {
}
clauses = append(clauses, dal.Where("created_at < ? ", extractor.GetUntil()))

// first get total count for progress tracking
count, err := db.Count(clauses...)
if err != nil {
return errors.Default.Wrap(err, "error getting count of clauses")
return errors.Default.Wrap(err, "error getting count of records")
}
cursor, err := db.Cursor(clauses...)
logger.Info("get data from %s where params=%s and got %d with clauses %+v", table, params, count, clauses)

// get all IDs
var ids []uint64
err = db.Pluck("id", &ids, clauses...)
if err != nil {
return errors.Default.Wrap(err, "error running DB query")
return errors.Default.Wrap(err, "error getting IDs")
}
logger.Info("get data from %s where params=%s and got %d with clauses %+v", table, params, count, clauses)

defer cursor.Close()
// batch save divider
divider := NewBatchSaveDivider(extractor.SubTaskContext, extractor.GetBatchSize(), table, params)
divider.SetIncrementalMode(extractor.IsIncremental())

// progress
extractor.SetProgress(0, -1)
ctx := extractor.GetContext()
// iterate all rows
for cursor.Next() {

// process each record individually by ID
for _, id := range ids {
select {
case <-ctx.Done():
return errors.Convert(ctx.Err())
default:
}

// load full record by ID
row := &RawData{}
err = db.Fetch(cursor, row)
err := db.First(row, dal.From(table), dal.Where("id = ?", id))
if err != nil {
return errors.Default.Wrap(err, "error fetching row")
return errors.Default.Wrap(err, "error loading full row by ID")
}

body := new(InputType)
Expand All @@ -164,6 +172,7 @@ func (extractor *StatefulApiExtractor[InputType]) Execute() errors.Error {
if err != nil {
return errors.Default.Wrap(err, "error calling plugin Extract implementation")
}

for _, result := range results {
// get the batch operator for the specific type
batch, err := divider.ForType(reflect.TypeOf(result))
Expand All @@ -184,16 +193,13 @@ func (extractor *StatefulApiExtractor[InputType]) Execute() errors.Error {
}
extractor.IncProgress(1)
}
if err := cursor.Err(); err != nil {
return errors.Default.Wrap(err, "error occurred during database cursor iteration in StatefulApiExtractor")
}

// save the last batches
err = divider.Close()
if err != nil {
return err
}
// save the incremantal state
// save the incremental state
return extractor.SubtaskStateManager.Close()
}

Expand Down
Loading