Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions plugins/helper/api_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ type TestTable struct {
common.NoPKModel
}

type TestTable2 struct {
Email string `gorm:"primaryKey;type:varchar(255)"`
Name string `gorm:"type:varchar(255)"`
common.NoPKModel
}

var TestTableData *TestTable = &TestTable{
Email: "test@test.com",
Name: "test",
Expand Down
3 changes: 2 additions & 1 deletion plugins/helper/api_extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package helper

import (
"fmt"
"github.com/merico-dev/lake/models/common"
"reflect"

"github.com/merico-dev/lake/models/common"

"github.com/merico-dev/lake/plugins/core"
)

Expand Down
179 changes: 179 additions & 0 deletions plugins/helper/api_extractor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
package helper

import (
"context"
"database/sql"
"fmt"
"reflect"
"testing"
"time"

"github.com/agiledragon/gomonkey/v2"
"github.com/magiconair/properties/assert"
"github.com/sirupsen/logrus"
"gorm.io/datatypes"
"gorm.io/gorm"
)

var TestRawData *RawData = &RawData{
ID: 110100100116102,
Params: "TestParams",
Data: []byte{46, 99, 111, 109},
Url: "http://devlake.io/",
Input: datatypes.JSON(TestRawMessage),
CreatedAt: time.Now(),
}

// go test -gcflags=all=-l
func CreateTestApiExtractor(t *testing.T) (*ApiExtractor, error) {
var ctx context.Context
ctx, Cancel = context.WithCancel(context.Background())
return NewApiExtractor(ApiExtractorArgs{
RawDataSubTaskArgs: RawDataSubTaskArgs{
Ctx: &DefaultSubTaskContext{
defaultExecContext: newDefaultExecContext(GetConfigForTest("../../"), NewDefaultLogger(logrus.New(), "Test", make(map[string]*logrus.Logger)), &gorm.DB{}, ctx, "Test", nil, nil),
},
Table: TestTable{}.TableName(),
Params: &TestParam{
Test: TestUrlParam,
},
},
BatchSize: TestBatchSize,
Extract: func(row *RawData) ([]interface{}, error) {
assert.Equal(t, row, TestRawData)
results := make([]interface{}, 0, TestDataCount)
for i := 0; i < TestDataCount; i++ {
results = append(results, TestTableData)
}
return results, nil
},
})
}

func TestApiExtractorExecute(t *testing.T) {
MockDB(t)
defer UnMockDB()

gt.Reset()
gt = gomonkey.ApplyMethod(reflect.TypeOf(&gorm.DB{}), "Table", func(db *gorm.DB, name string, args ...interface{}) *gorm.DB {
assert.Equal(t, name, "_raw_"+TestTableData.TableName())
return db
},
)

apiExtractor, _ := CreateTestApiExtractor(t)

datacount := TestDataCount
gn := gomonkey.ApplyMethod(reflect.TypeOf(&sql.Rows{}), "Next", func(r *sql.Rows) bool {
if datacount > 0 {
datacount--
return true
} else {
return false
}
})
defer gn.Reset()

gcl := gomonkey.ApplyMethod(reflect.TypeOf(&sql.Rows{}), "Close", func(r *sql.Rows) error {
return nil
})
defer gcl.Reset()

gs.Reset()

scanrowTimes := 0
gs = gomonkey.ApplyMethod(reflect.TypeOf(&gorm.DB{}), "ScanRows", func(db *gorm.DB, rows *sql.Rows, dest interface{}) error {
scanrowTimes++
*dest.(*RawData) = *TestRawData
return nil
},
)

fortypeTimes := 0
gf := gomonkey.ApplyMethod(reflect.TypeOf(&BatchSaveDivider{}), "ForType", func(d *BatchSaveDivider, rowType reflect.Type) (*BatchSave, error) {
fortypeTimes++
assert.Equal(t, rowType, reflect.TypeOf(TestTableData))
err := d.onNewBatchSave(rowType)
assert.Equal(t, err, nil)

return &BatchSave{}, nil
})
defer gf.Reset()

addTimes := 0
gsv := gomonkey.ApplyMethod(reflect.TypeOf(&BatchSave{}), "Add", func(c *BatchSave, slot interface{}) error {
addTimes++
assert.Equal(t, slot, TestTableData)
return nil
})
defer gsv.Reset()

// begin testing
err := apiExtractor.Execute()
assert.Equal(t, err, nil)
assert.Equal(t, scanrowTimes, TestDataCount)
assert.Equal(t, fortypeTimes, TestDataCount*TestDataCount)
assert.Equal(t, addTimes, TestDataCount*TestDataCount)
}

func TestApiExtractorExecute_Cancel(t *testing.T) {
MockDB(t)
defer UnMockDB()

gt.Reset()
gt = gomonkey.ApplyMethod(reflect.TypeOf(&gorm.DB{}), "Table", func(db *gorm.DB, name string, args ...interface{}) *gorm.DB {
assert.Equal(t, name, "_raw_"+TestTableData.TableName())
return db
},
)

apiExtractor, _ := CreateTestApiExtractor(t)

gn := gomonkey.ApplyMethod(reflect.TypeOf(&sql.Rows{}), "Next", func(r *sql.Rows) bool {
// death loop for testing cancel
return true
})
defer gn.Reset()

gcl := gomonkey.ApplyMethod(reflect.TypeOf(&sql.Rows{}), "Close", func(r *sql.Rows) error {
return nil
})
defer gcl.Reset()

gs.Reset()

scanrowTimes := 0
gs = gomonkey.ApplyMethod(reflect.TypeOf(&gorm.DB{}), "ScanRows", func(db *gorm.DB, rows *sql.Rows, dest interface{}) error {
scanrowTimes++
*dest.(*RawData) = *TestRawData
return nil
},
)

fortypeTimes := 0
gf := gomonkey.ApplyMethod(reflect.TypeOf(&BatchSaveDivider{}), "ForType", func(d *BatchSaveDivider, rowType reflect.Type) (*BatchSave, error) {
fortypeTimes++
assert.Equal(t, rowType, reflect.TypeOf(TestTableData))
err := d.onNewBatchSave(rowType)
assert.Equal(t, err, nil)

return &BatchSave{}, nil
})
defer gf.Reset()

addTimes := 0
gsv := gomonkey.ApplyMethod(reflect.TypeOf(&BatchSave{}), "Add", func(c *BatchSave, slot interface{}) error {
addTimes++
assert.Equal(t, slot, TestTableData)
return nil
})
defer gsv.Reset()

go func() {
time.Sleep(time.Duration(500) * time.Microsecond)
Cancel()
}()

err := apiExtractor.Execute()
assert.Equal(t, err, fmt.Errorf("context canceled"))
}
16 changes: 8 additions & 8 deletions plugins/helper/batch_save_divider.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ type OnNewBatchSave func(rowType reflect.Type) error

// Holds a map of BatchInsert, return `*BatchInsert` for a specific records, so caller can do batch operation for it
type BatchSaveDivider struct {
db *gorm.DB
batches map[reflect.Type]*BatchSave
batchSize int
onNewBatchInsert OnNewBatchSave
db *gorm.DB
batches map[reflect.Type]*BatchSave
batchSize int
onNewBatchSave OnNewBatchSave
}

// Return a new BatchInsertDivider instance
Expand All @@ -26,10 +26,10 @@ func NewBatchSaveDivider(db *gorm.DB, batchSize int) *BatchSaveDivider {
}

func (d *BatchSaveDivider) OnNewBatchSave(cb OnNewBatchSave) {
d.onNewBatchInsert = cb
d.onNewBatchSave = cb
}

// return *BatchInsert for specified type
// return *BatchSave for specified type
func (d *BatchSaveDivider) ForType(rowType reflect.Type) (*BatchSave, error) {
// get the cache for the specific type
batch := d.batches[rowType]
Expand All @@ -40,8 +40,8 @@ func (d *BatchSaveDivider) ForType(rowType reflect.Type) (*BatchSave, error) {
if err != nil {
return nil, err
}
if d.onNewBatchInsert != nil {
err = d.onNewBatchInsert(rowType)
if d.onNewBatchSave != nil {
err = d.onNewBatchSave(rowType)
if err != nil {
return nil, err
}
Expand Down
48 changes: 48 additions & 0 deletions plugins/helper/batch_save_divider_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package helper

import (
"reflect"
"testing"

"github.com/stretchr/testify/assert"
"gorm.io/gorm"
)

// go test -gcflags=all=-l

var TestBatchSize int = 100

func CreateTestBatchSaveDivider() *BatchSaveDivider {
return NewBatchSaveDivider(&gorm.DB{}, TestBatchSize)
}

func TestBatchSaveDivider(t *testing.T) {
MockDB(t)
defer UnMockDB()
batchSaveDivider := CreateTestBatchSaveDivider()
initTimes := 0

batchSaveDivider.OnNewBatchSave(func(rowType reflect.Type) error {
initTimes++
return nil
})

var err error
var b1 *BatchSave
var b2 *BatchSave
var b3 *BatchSave

// test if it saved and only saved once for one Type
b1, err = batchSaveDivider.ForType(reflect.TypeOf(TestTableData))
assert.Equal(t, initTimes, 1)
assert.Equal(t, err, nil)
b2, err = batchSaveDivider.ForType(reflect.TypeOf(&TestTable2{}))
assert.Equal(t, initTimes, 2)
assert.Equal(t, err, nil)
b3, err = batchSaveDivider.ForType(reflect.TypeOf(TestTableData))
assert.Equal(t, initTimes, 2)
assert.Equal(t, err, nil)

assert.NotEqual(t, b1, b2)
assert.Equal(t, b1, b3)
}
38 changes: 38 additions & 0 deletions plugins/helper/batch_save_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ import (
"reflect"
"testing"

"github.com/agiledragon/gomonkey/v2"
"github.com/merico-dev/lake/models/domainlayer"
"github.com/merico-dev/lake/models/domainlayer/ticket"
"github.com/stretchr/testify/assert"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)

func Test_getPrimaryKeyValue(t *testing.T) {
Expand Down Expand Up @@ -115,3 +118,38 @@ func Test_hasPrimaryKey(t *testing.T) {
})
}
}

// go test -gcflags=all=-l
func TestBatchSave(t *testing.T) {
db := &gorm.DB{}
sqlTimes := 0

gcl := gomonkey.ApplyMethod(reflect.TypeOf(&gorm.DB{}), "Clauses", func(db *gorm.DB, conds ...clause.Expression) (tx *gorm.DB) {
sqlTimes++
return db
},
)
defer gcl.Reset()

gcr := gomonkey.ApplyMethod(reflect.TypeOf(&gorm.DB{}), "Create", func(db *gorm.DB, value interface{}) *gorm.DB {
assert.Equal(t, TestTableData, value.([]*TestTable)[0])
return db
},
)
defer gcr.Reset()

TestBatchSize = 1
rowType := reflect.TypeOf(TestTableData)
batch, err := NewBatchSave(db, rowType, TestBatchSize)

// test diff type
assert.Equal(t, err, nil)
err = batch.Add(&TestBatchSize)
assert.NotEqual(t, err, nil)

// test right type
err = batch.Add(TestTableData)
assert.Equal(t, err, nil)

assert.Equal(t, sqlTimes, 1)
}
Loading