Skip to content

Commit

Permalink
refactor: add subtask register for github
Browse files Browse the repository at this point in the history
  • Loading branch information
chenggui53 committed Jun 22, 2023
1 parent 92cd274 commit 3e3eda0
Show file tree
Hide file tree
Showing 72 changed files with 804 additions and 148 deletions.
2 changes: 2 additions & 0 deletions backend/core/plugin/plugin_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package plugin

import (
"context"

corecontext "github.com/apache/incubator-devlake/core/context"
"github.com/apache/incubator-devlake/core/errors"
)
Expand Down Expand Up @@ -98,6 +99,7 @@ type SubTaskMeta struct {
Description string
DomainTypes []string
Dependencies []*SubTaskMeta
DependencyTables []string
}

// PluginTask Implement this interface to let framework run tasks for you
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ package subtaskmeta_sorter

import (
"fmt"

"github.com/apache/incubator-devlake/core/plugin"
"sort"
)

type DependencySorter struct {
Expand All @@ -32,11 +32,12 @@ func NewDependencySorter(metas []*plugin.SubTaskMeta) SubTaskMetaSorter {
}

func (d *DependencySorter) Sort() ([]plugin.SubTaskMeta, error) {
return topologicalSort(d.metas)
return dependenciesTopologicalSort(d.metas)
}

// stable topological sort
func topologicalSort(metas []*plugin.SubTaskMeta) ([]plugin.SubTaskMeta, error) {
func dependenciesTopologicalSort(metas []*plugin.SubTaskMeta) ([]plugin.SubTaskMeta, error) {
// 1. construct data
// which state will make a cycle
dependenciesMap := make(map[string][]string)
nameMetaMap := make(map[string]*plugin.SubTaskMeta)
Expand All @@ -60,59 +61,20 @@ func topologicalSort(metas []*plugin.SubTaskMeta) ([]plugin.SubTaskMeta, error)
}
}

orderedSubtaskList := make([]plugin.SubTaskMeta, 0)
for {
if len(dependenciesMap) == 0 {
break
}

tmpList := make([]string, 0)
for key, item := range dependenciesMap {
if len(item) == 0 {
tmpList = append(tmpList, key)
}
}
if len(tmpList) == 0 {
return nil, fmt.Errorf("cyclic dependency detected: %v", dependenciesMap)
}

// remove item in dependencies map
for key, value := range dependenciesMap {
if contains(tmpList, key) {
delete(dependenciesMap, key)
} else {
dependenciesMap[key] = removeElements(value, tmpList)
}
}

sort.Strings(tmpList)
// convert item to subtaskmeta by name, and append to orderedSubtaskList
for _, item := range tmpList {
value, ok := nameMetaMap[item]
if !ok {
return nil, fmt.Errorf("illeagal subtaskmeta detected %s", item)
}
orderedSubtaskList = append(orderedSubtaskList, *value)
}
}
return orderedSubtaskList, nil
}

func contains[T comparable](itemList []T, item T) bool {
for _, newItem := range itemList {
if item == newItem {
return true
}
// sort
orderStrList, err := topologicalSortSameElements(dependenciesMap)
if err != nil {
return nil, err
}
return false
}

func removeElements[T comparable](raw, toRemove []T) []T {
newList := make([]T, 0)
for _, item := range raw {
if !contains(toRemove, item) {
newList = append(newList, item)
// gen list by sorted name list and return
orderedSubtaskList := make([]plugin.SubTaskMeta, 0)
for _, item := range orderStrList {
value, ok := nameMetaMap[item]
if !ok {
return nil, fmt.Errorf("illeagal subtaskmeta detected %s", item)
}
orderedSubtaskList = append(orderedSubtaskList, *value)
}
return newList
return orderedSubtaskList, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ limitations under the License.
package subtaskmeta_sorter

import (
"github.com/apache/incubator-devlake/core/plugin"
"reflect"
"testing"

"github.com/apache/incubator-devlake/core/plugin"
)

func Test_topologicalSort(t *testing.T) {
Expand Down Expand Up @@ -85,13 +86,13 @@ func Test_topologicalSort(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := topologicalSort(tt.args.metas)
got, err := dependenciesTopologicalSort(tt.args.metas)
if (err != nil) != tt.wantErr {
t.Errorf("topologicalSort() error = %v, wantErr %v", err, tt.wantErr)
t.Errorf("dependenciesTopologicalSort() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("topologicalSort() got = %v, want %v", got, tt.want)
t.Errorf("dependenciesTopologicalSort() got = %v, want %v", got, tt.want)
}
})
}
Expand Down
141 changes: 141 additions & 0 deletions backend/helpers/pluginhelper/subtaskmeta_sorter/table.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package subtaskmeta_sorter

import (
"fmt"
"github.com/apache/incubator-devlake/core/plugin"
"sort"
)

type TableSorter struct {
metas []*plugin.SubTaskMeta
}

func NewTableSorter(metas []*plugin.SubTaskMeta) SubTaskMetaSorter {
return &TableSorter{metas: metas}
}

func (d *TableSorter) Sort() ([]plugin.SubTaskMeta, error) {
return dependencyTableTopologicalSort(d.metas)
}

type SubtaskPrefix string

const (
prefixCollect SubtaskPrefix = "collect"
prefixExtract SubtaskPrefix = "extract"
prefixConvert SubtaskPrefix = "convert"
)

func genClassNameByMetaName(rawName string) (string, error) {
if len(rawName) > 7 {
return rawName[7:], nil
}
return "", fmt.Errorf("got illeagal raw name = %s", rawName)
}

// stable topological sort
func dependencyTableTopologicalSort(metas []*plugin.SubTaskMeta) ([]plugin.SubTaskMeta, error) {
// TODO 1. can i use reflect to realize collect, extractor, converter ?
// first process same class data
// suppose different class has no dependency relation
// construct class name list and table list meta
// sort different metas
// add list by convert and

// 1. construct data to sort
classNameToSubtaskListMap := make(map[string][]*plugin.SubTaskMeta) // use subtask class name to get meta list
classNameToTableListMap := make(map[string][]string) // use class name get meta name list
subtaskNameToDataMap := make(map[string]*plugin.SubTaskMeta) // use name to get meta

for _, metaItem := range metas {
taskClassName, err := genClassNameByMetaName(metaItem.Name)
if err != nil {
return nil, err
}
if value, ok := classNameToSubtaskListMap[taskClassName]; ok {
classNameToSubtaskListMap[taskClassName] = append(value, metaItem)
} else {
classNameToSubtaskListMap[taskClassName] = []*plugin.SubTaskMeta{metaItem}
}
if value, ok := classNameToTableListMap[taskClassName]; ok {
// check if subtask in one class has different tables define
if len(value) != len(metaItem.DependencyTables) {
return nil, fmt.Errorf("got different table list in class %s", taskClassName)
}
// check list item in value and metaItem.DependencyTables, make sure it's equal
sort.Strings(value)
sort.Strings(metaItem.DependencyTables)
for index, valueItem := range value {
if valueItem != metaItem.DependencyTables[index] {
return nil, fmt.Errorf("got different table list in class %s", taskClassName)
}
}
} else {
classNameToTableListMap[taskClassName] = metaItem.DependencyTables
}
subtaskNameToDataMap[metaItem.Name] = metaItem
}

// 2. sort
sortedNameList, err := topologicalSortDifferentElements(classNameToTableListMap)
if err != nil {
return nil, err
}

// 3. gen subtaskmeta list by sorted data and return
sortedSubtaskMetaList := make([]plugin.SubTaskMeta, 0)
for _, nameItem := range sortedNameList {
value, ok := classNameToSubtaskListMap[nameItem]
if !ok {
return nil, fmt.Errorf("failed get subtask list by class name = %s", nameItem)
}
tmpList := make([]plugin.SubTaskMeta, len(value))
for _, subtaskItem := range value {
if len(value) >= 1 && len(subtaskItem.Name) > 7 {
switch SubtaskPrefix(subtaskItem.Name[:7]) {
case prefixCollect:
tmpList[0] = *subtaskItem
case prefixExtract:
tmpList[1] = *subtaskItem
case prefixConvert:
if len(value) == 3 {
tmpList[2] = *subtaskItem
} else {
return nil, fmt.Errorf("got wrong length of list with extrac subtask")
}
default:
return nil, fmt.Errorf("got wrong length of subtask %v", subtaskItem)
}
}
}
sortedSubtaskMetaList = append(sortedSubtaskMetaList, tmpList...)
}
return sortedSubtaskMetaList, nil
}

// TODO get subtask class list, different class can task concurrency
func GetSortedClassName() []string {
return nil
}

// TODO get subtask list by class name, this subtask list should run sequentially
func GetSubtaskMetasByClassName(className string) []*plugin.SubTaskMeta {
return nil
}
85 changes: 85 additions & 0 deletions backend/helpers/pluginhelper/subtaskmeta_sorter/table_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package subtaskmeta_sorter

import (
"github.com/apache/incubator-devlake/core/plugin"
"reflect"
"testing"
)

func Test_dependencyTableTopologicalSort(t *testing.T) {
pluginA := plugin.SubTaskMeta{
Name: string(prefixCollect) + "A",
DependencyTables: []string{"Table1"},
}
pluginB := plugin.SubTaskMeta{
Name: string(prefixCollect) + "B",
DependencyTables: []string{"table2"},
}
pluginC := plugin.SubTaskMeta{
Name: string(prefixCollect) + "C",
DependencyTables: []string{"table1", "table2"},
}
pluginD := plugin.SubTaskMeta{
Name: string(prefixCollect) + "D",
DependencyTables: []string{"table1", "table2"},
}
type args struct {
metas []*plugin.SubTaskMeta
}
tests := []struct {
name string
args args
want []plugin.SubTaskMeta
wantErr bool
}{
{
name: "correct stable sort",
args: args{
metas: []*plugin.SubTaskMeta{
&pluginA, &pluginB, &pluginC,
},
},
want: []plugin.SubTaskMeta{pluginA, pluginB, pluginC},
wantErr: false,
},
{
name: "cycle error",
args: args{
metas: []*plugin.SubTaskMeta{
&pluginC, &pluginD,
},
},
want: nil,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := dependencyTableTopologicalSort(tt.args.metas)
if (err != nil) != tt.wantErr {
t.Errorf("dependencyTableTopologicalSort() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("dependencyTableTopologicalSort() got = %v, want %v", got, tt.want)
}
})
}
}
Loading

0 comments on commit 3e3eda0

Please sign in to comment.