diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 328c8a9a..83eac210 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -13,9 +13,9 @@ jobs: with: go-version: ${{ matrix.go-version }} - name: Checkout code - uses: actions/checkout@v2.4.0 + uses: actions/checkout@v3 - name: Linting - uses: golangci/golangci-lint-action@v2.5.2 + uses: golangci/golangci-lint-action@v3 with: version: v1.29 - name: test diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml index a00517f8..e2d14ab3 100644 --- a/.github/workflows/codeql-analysis.yml +++ b/.github/workflows/codeql-analysis.yml @@ -39,7 +39,7 @@ jobs: # Initializes the CodeQL tools for scanning. - name: Initialize CodeQL - uses: github/codeql-action/init@v1 + uses: github/codeql-action/init@v2 with: languages: ${{ matrix.language }} # If you wish to specify custom queries, you can do so here or in a config file. @@ -50,7 +50,7 @@ jobs: # Autobuild attempts to build any compiled languages (C/C++, C#, or Java). # If this step fails, then you should remove it and run the build manually (see below) - name: Autobuild - uses: github/codeql-action/autobuild@v1 + uses: github/codeql-action/autobuild@v2 # ℹī¸ Command-line programs to run using the OS shell. # 📚 https://git.io/JvXDl @@ -64,4 +64,4 @@ jobs: # make release - name: Perform CodeQL Analysis - uses: github/codeql-action/analyze@v1 + uses: github/codeql-action/analyze@v2 diff --git a/item.go b/item.go index 5a311761..b78a8b34 100644 --- a/item.go +++ b/item.go @@ -84,6 +84,39 @@ func send(ctx context.Context, ch chan<- Item, items ...interface{}) { } } +func sendSingleItem(ctx context.Context, ch chan<- Item, strategy CloseChannelStrategy, items ...interface{}) { + if strategy == CloseChannel { + defer close(ch) + } + for _, currentItem := range items { + switch item := currentItem.(type) { + default: + rt := reflect.TypeOf(item) + switch rt.Kind() { + default: + Of(item).SendContext(ctx, ch) + case reflect.Chan: + in := reflect.ValueOf(currentItem) + for { + v, ok := in.Recv() + if !ok { + return + } + currentItem := v.Interface() + switch item := currentItem.(type) { + default: + Of(item).SendContext(ctx, ch) + case error: + Error(item).SendContext(ctx, ch) + } + } + } + case error: + Error(item).SendContext(ctx, ch) + } + } +} + // Error checks if an item is an error. func (i Item) Error() bool { return i.E != nil diff --git a/iterable_just.go b/iterable_just.go index 0856a8ff..3b30d02a 100644 --- a/iterable_just.go +++ b/iterable_just.go @@ -18,6 +18,6 @@ func (i *justIterable) Observe(opts ...Option) <-chan Item { option := parseOptions(append(i.opts, opts...)...) next := option.buildChannel() - go SendItems(option.buildContext(emptyContext), next, CloseChannel, i.items) + go sendSingleItem(option.buildContext(emptyContext), next, CloseChannel, i.items...) return next } diff --git a/single.go b/single.go index e4d70d28..a1fa1ce7 100644 --- a/single.go +++ b/single.go @@ -1,6 +1,8 @@ package rxgo -import "context" +import ( + "context" +) // Single is a observable with a single element. type Single interface {