Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions api/pipelines/pipelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ POST /pipelines
// @Description }
// @Tags pipelines
// @Accept application/json
// @Param pipeline body string true "json"
// @Param pipeline body models.NewPipeline true "json"
// @Success 200 {object} models.Pipeline
// @Failure 400 {string} errcode.Error "Bad Request"
// @Failure 500 {string} errcode.Error "Internel Error"
// @Failure 500 {string} errcode.Error "Internal Error"
// @Router /pipelines [post]
func Post(c *gin.Context) {
newPipeline := &models.NewPipeline{}
Expand Down
16 changes: 15 additions & 1 deletion helpers/e2ehelper/data_flow_tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,13 +180,18 @@ func (t *DataFlowTester) FlushTabler(dst schema.Tabler) {

// Subtask executes specified subtasks
func (t *DataFlowTester) Subtask(subtaskMeta core.SubTaskMeta, taskData interface{}) {
subtaskCtx := helper.NewStandaloneSubTaskContext(context.Background(), t.Cfg, t.Log, t.Db, t.Name, taskData)
subtaskCtx := t.SubtaskContext(taskData)
err := subtaskMeta.EntryPoint(subtaskCtx)
if err != nil {
panic(err)
}
}

// SubtaskContext creates a subtask context
func (t *DataFlowTester) SubtaskContext(taskData interface{}) core.SubTaskContext {
return helper.NewStandaloneSubTaskContext(context.Background(), t.Cfg, t.Log, t.Db, t.Name, taskData)
}

func filterColumn(column dal.ColumnMeta, opts TableOptions) bool {
for _, ignore := range opts.IgnoreFields {
if column.Name() == ignore {
Expand Down Expand Up @@ -241,6 +246,8 @@ func (t *DataFlowTester) CreateSnapshot(dst schema.Tabler, opts TableOptions) {
forScanValues[i] = new(sql.NullTime)
} else if columnType.ScanType().Name() == `bool` {
forScanValues[i] = new(bool)
} else if columnType.ScanType().Name() == `RawBytes` {
forScanValues[i] = new(sql.NullString)
} else {
forScanValues[i] = new(string)
}
Expand All @@ -267,6 +274,13 @@ func (t *DataFlowTester) CreateSnapshot(dst schema.Tabler, opts TableOptions) {
} else {
values[i] = `0`
}
case *sql.NullString:
value := *forScanValues[i].(*sql.NullString)
if value.Valid {
values[i] = value.String
} else {
values[i] = ``
}
case *string:
values[i] = fmt.Sprint(*forScanValues[i].(*string))
}
Expand Down
66 changes: 40 additions & 26 deletions plugins/helper/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,27 +34,41 @@ type Iterator interface {

// DalCursorIterator FIXME ...
type DalCursorIterator struct {
db dal.Dal
cursor *sql.Rows
elemType reflect.Type
db dal.Dal
cursor *sql.Rows
elemType reflect.Type
batchSize int
}

// NewDalCursorIterator FIXME ...
func NewDalCursorIterator(db dal.Dal, cursor *sql.Rows, elemType reflect.Type) (*DalCursorIterator, error) {
return NewBatchedDalCursorIterator(db, cursor, elemType, -1)
}

// NewBatchedDalCursorIterator FIXME ...
func NewBatchedDalCursorIterator(db dal.Dal, cursor *sql.Rows, elemType reflect.Type, batchSize int) (*DalCursorIterator, error) {
return &DalCursorIterator{
db: db,
cursor: cursor,
elemType: elemType,
db: db,
cursor: cursor,
elemType: elemType,
batchSize: batchSize,
}, nil
}

// HasNext FIXME ...
// HasNext increments the row curser. If we're at the end, it'll return false.
func (c *DalCursorIterator) HasNext() bool {
return c.cursor.Next()
}

// Fetch FIXME ...
// Fetch if batching is disabled, it'll read a single row, otherwise it'll read as many rows up to the batch size, and the
// runtime return type will be []interface{}. Note, HasNext needs to have been called before invoking this.
func (c *DalCursorIterator) Fetch() (interface{}, error) {
if c.batchSize > 0 {
return c.batchedFetch()
}
if c.batchSize != -1 {
panic("invalid batch size")
}
elem := reflect.New(c.elemType).Interface()
err := c.db.Fetch(c.cursor, elem)
if err != nil {
Expand All @@ -63,7 +77,23 @@ func (c *DalCursorIterator) Fetch() (interface{}, error) {
return elem, nil
}

// Close interator
func (c *DalCursorIterator) batchedFetch() (interface{}, error) {
var elems []interface{}
for i := 1; ; i++ {
elem := reflect.New(c.elemType).Interface()
err := c.cursor.Scan(elem)
if err != nil {
return nil, err
}
elems = append(elems, elem)
if i == c.batchSize || !c.HasNext() {
break
}
}
return elems, nil
}

// Close iterator
func (c *DalCursorIterator) Close() error {
return c.cursor.Close()
}
Expand Down Expand Up @@ -114,22 +144,6 @@ func NewDateIterator(days int) (*DateIterator, error) {
}, nil
}

type QueueIteratorNode struct {
data interface{}
next *QueueIteratorNode
}

func (q *QueueIteratorNode) Next() interface{} {
if q.next == nil {
return nil
}
return q.next
}

func (q *QueueIteratorNode) SetNext(next interface{}) {
q.next, _ = next.(*QueueIteratorNode)
}

type QueueIterator struct {
queue *Queue
}
Expand All @@ -143,7 +157,7 @@ func (q *QueueIterator) Fetch() (interface{}, error) {
}

func (q *QueueIterator) Push(data QueueNode) {
q.queue.PushWitouLock(data)
q.queue.PushWithoutLock(data)
}

func (q *QueueIterator) Close() error {
Expand Down
11 changes: 8 additions & 3 deletions plugins/helper/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,13 @@ limitations under the License.

package helper

// ListBaseNode 'abstract' base struct for Nodes that are chained in a linked list manner
type ListBaseNode struct {
next interface{}
next *ListBaseNode
}

func (l *ListBaseNode) Data() interface{} {
panic("list node Data() needs to be implemented by subclasses")
}

func (l *ListBaseNode) Next() interface{} {
Expand All @@ -29,10 +34,10 @@ func (l *ListBaseNode) Next() interface{} {
}

func (l *ListBaseNode) SetNext(next interface{}) {
l.next = next
l.next = next.(*ListBaseNode)
}

// NewListBaseNode create and init a new node
// NewListBaseNode create and init a new node (only to be called by subclasses)
func NewListBaseNode() *ListBaseNode {
return &ListBaseNode{
next: nil,
Expand Down
35 changes: 31 additions & 4 deletions plugins/helper/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
type QueueNode interface {
Next() interface{}
SetNext(next interface{})
Data() interface{}
}

type Queue struct {
Expand All @@ -38,7 +39,7 @@ type Queue struct {
func (q *Queue) Push(node QueueNode) {
q.mux.Lock()
defer q.mux.Unlock()
q.PushWitouLock(node)
q.PushWithoutLock(node)
}

// Pull get a node from queue
Expand All @@ -57,8 +58,8 @@ func (q *Queue) Pull(add *int64) QueueNode {
return node
}

// PushWitouLock is no lock mode of Push
func (q *Queue) PushWitouLock(node QueueNode) {
// PushWithoutLock is no lock mode of Push
func (q *Queue) PushWithoutLock(node QueueNode) {
if q.tail == nil {
q.head = node
q.tail = node
Expand All @@ -70,7 +71,7 @@ func (q *Queue) PushWitouLock(node QueueNode) {
}
}

// PullWitouLock is no lock mode of Pull
// PullWithOutLock is no lock mode of Pull
func (q *Queue) PullWithOutLock() QueueNode {
var node QueueNode = nil

Expand Down Expand Up @@ -125,3 +126,29 @@ func NewQueue() *Queue {
mux: sync.Mutex{},
}
}

type QueueIteratorNode struct {
next *QueueIteratorNode
data interface{}
}

func (q *QueueIteratorNode) Next() interface{} {
if q.next == nil {
return nil
}
return q.next
}

func (q *QueueIteratorNode) SetNext(next interface{}) {
q.next, _ = next.(*QueueIteratorNode)
}

func (q *QueueIteratorNode) Data() interface{} {
return q.data
}

func NewQueueIteratorNode(data interface{}) *QueueIteratorNode {
return &QueueIteratorNode{
data: data,
}
}
2 changes: 1 addition & 1 deletion plugins/jenkins/models/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ func (JenkinsJob) TableName() string {
}

type FolderInput struct {
Path string
*helper.ListBaseNode
Path string
}

func NewFolderInput(path string) *FolderInput {
Expand Down
130 changes: 130 additions & 0 deletions plugins/jira/e2e/epic_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package e2e
Comment thread
keon94 marked this conversation as resolved.

import (
"github.com/apache/incubator-devlake/helpers/e2ehelper"
"github.com/apache/incubator-devlake/models/common"
"github.com/apache/incubator-devlake/plugins/jira/impl"
"github.com/apache/incubator-devlake/plugins/jira/models"
"github.com/apache/incubator-devlake/plugins/jira/tasks"
"github.com/stretchr/testify/require"
"testing"
)

func TestEpicDataflow(t *testing.T) {
var plugin impl.Jira
dataflowTester := e2ehelper.NewDataFlowTester(t, "jira", plugin)
taskData := &tasks.JiraTaskData{
Options: &tasks.JiraOptions{
ConnectionId: 1,
BoardId: 93,
TransformationRules: tasks.TransformationRules{StoryPointField: "customfield_10024"},
},
}

dataflowTester.ImportCsvIntoRawTable("./raw_tables/_raw_jira_api_issue_types.csv", "_raw_jira_api_issue_types")
dataflowTester.ImportCsvIntoRawTable("./raw_tables/_raw_jira_api_issues.csv", "_raw_jira_api_issues")
dataflowTester.ImportCsvIntoRawTable("./raw_tables/_raw_jira_external_epics.csv", "_raw_jira_api_epics")

dataflowTester.FlushTabler(&models.JiraIssue{})
dataflowTester.FlushTabler(&models.JiraBoardIssue{})
dataflowTester.FlushTabler(&models.JiraSprintIssue{})
dataflowTester.FlushTabler(&models.JiraIssueChangelogs{})
dataflowTester.FlushTabler(&models.JiraIssueChangelogItems{})
dataflowTester.FlushTabler(&models.JiraWorklog{})
dataflowTester.FlushTabler(&models.JiraAccount{})
dataflowTester.FlushTabler(&models.JiraIssueType{})

ctx := dataflowTester.SubtaskContext(taskData)

// run pre-req subtasks
require.NoError(t, tasks.ExtractIssueTypesMeta.EntryPoint(ctx))
require.NoError(t, tasks.ExtractIssuesMeta.EntryPoint(ctx))
dataflowTester.VerifyTableWithOptions(
models.JiraIssue{}, e2ehelper.TableOptions{
CSVRelPath: "./snapshot_tables/_tool_jira_issues_for_external_epics.csv",
TargetFields: nil,
IgnoreFields: nil,
IgnoreTypes: []interface{}{common.NoPKModel{}},
},
)
dataflowTester.VerifyTableWithOptions(
models.JiraBoardIssue{}, e2ehelper.TableOptions{
CSVRelPath: "./snapshot_tables/_tool_jira_board_issues_for_external_epics.csv",
TargetFields: []string{"connection_id", "board_id", "issue_id"},
IgnoreFields: nil,
IgnoreTypes: []interface{}{common.NoPKModel{}},
},
)
t.Run("batch_single", func(t *testing.T) {
// run the part of the collector that queries tools data
iter, err := tasks.GetEpicKeysIterator(ctx.GetDal(), taskData, 1)
require.NoError(t, err)
require.True(t, iter.HasNext())
e1, err := iter.Fetch()
require.NoError(t, err)
require.True(t, iter.HasNext())
e2, err := iter.Fetch()
require.NoError(t, err)
require.False(t, iter.HasNext())
require.Equal(t, 1, len(e1.([]interface{})))
require.Equal(t, 1, len(e2.([]interface{})))
epicKeys := []string{
*(e1.([]interface{})[0].(*string)),
*(e2.([]interface{})[0].(*string)),
}
require.Contains(t, epicKeys, "K5-1")
require.Contains(t, epicKeys, "K5-4")
})
t.Run("batch_multiple", func(t *testing.T) {
// run the part of the collector that queries tools data
iter, err := tasks.GetEpicKeysIterator(ctx.GetDal(), taskData, 2)
require.NoError(t, err)
require.True(t, iter.HasNext())
e, err := iter.Fetch()
require.NoError(t, err)
require.False(t, iter.HasNext())
require.Equal(t, 2, len(e.([]interface{})))
epicKeys := []string{
*(e.([]interface{})[0].(*string)),
*(e.([]interface{})[1].(*string)),
}
require.Contains(t, epicKeys, "K5-1")
require.Contains(t, epicKeys, "K5-4")
})

require.NoError(t, tasks.ExtractEpicsMeta.EntryPoint(ctx))

dataflowTester.VerifyTableWithOptions(
models.JiraBoardIssue{}, e2ehelper.TableOptions{
CSVRelPath: "./snapshot_tables/_tool_jira_board_issues_for_external_epics.csv",
TargetFields: nil,
IgnoreFields: nil,
IgnoreTypes: []interface{}{common.NoPKModel{}},
},
)
dataflowTester.VerifyTableWithOptions(
models.JiraIssue{}, e2ehelper.TableOptions{
CSVRelPath: "./snapshot_tables/_tool_jira_issues_for_external_epics.csv",
TargetFields: nil,
IgnoreFields: nil,
IgnoreTypes: []interface{}{common.NoPKModel{}},
},
)
}
Loading