diff --git a/internal/internal_workflow.go b/internal/internal_workflow.go index f79707fb0..e867b8cc4 100644 --- a/internal/internal_workflow.go +++ b/internal/internal_workflow.go @@ -650,14 +650,15 @@ func (c *channelImpl) Send(ctx Context, v interface{}) { return } for { - // Check for closed in the loop as close can be called when send is blocked - if c.closed { - panic("Closed channel") - } if valueConsumed { state.unblocked() return } + + // Check for closed in the loop as close can be called when send is blocked + if c.closed { + panic("Closed channel") + } state.yield(fmt.Sprintf("blocked on %s.Send", c.name)) } } @@ -695,9 +696,6 @@ func (c *channelImpl) Close() { callback.fn(nil, false) } // All blocked sends are going to panic - for _, callback := range c.blockedSends { - callback.fn() - } } // Takes a value and assigns that 'to' value. logs a metric if it is unable to deserialize diff --git a/internal/internal_workflow_test.go b/internal/internal_workflow_test.go index e2e089d7e..6ae163fb6 100644 --- a/internal/internal_workflow_test.go +++ b/internal/internal_workflow_test.go @@ -60,6 +60,8 @@ func (s *WorkflowUnitTest) SetupSuite() { RegisterWorkflow(receiveCorruptSignalOnClosedChannelWorkflowTest) RegisterWorkflow(bufferedChanWorkflowTest) RegisterWorkflow(bufferedChanWithSelectorWorkflowTest) + RegisterWorkflow(closeChannelTest) + RegisterWorkflow(closeChannelInSelectTest) s.activityOptions = ActivityOptions{ ScheduleToStartTimeout: time.Minute, @@ -759,6 +761,57 @@ func (s *WorkflowUnitTest) Test_CorruptedSignalOnClosedChannelWorkflow_Receive_S s.EqualValues(0, len(result)) } +func closeChannelTest(ctx Context) error { + ch := NewChannel(ctx) + Go(ctx, func(ctx Context) { + var dummy struct{} + ch.Receive(ctx, &dummy) + ch.Close() + }) + + ch.Send(ctx, struct{}{}) + return nil +} + +func (s *WorkflowUnitTest) Test_CloseChannelWorkflow() { + env := s.NewTestWorkflowEnvironment() + env.ExecuteWorkflow(closeChannelTest) + s.True(env.IsWorkflowCompleted()) + s.NoError(env.GetWorkflowError()) +} + +func closeChannelInSelectTest(ctx Context) error { + s := NewSelector(ctx) + sendCh := NewChannel(ctx) + receiveCh := NewChannel(ctx) + expectedValue := "expected value" + + Go(ctx, func(ctx Context) { + sendCh.Close() + receiveCh.Send(ctx, expectedValue) + }) + + var v string + s.AddSend(sendCh, struct{}{}, func() { + panic("callback for sendCh should not be executed") + }) + s.AddReceive(receiveCh, func(c Channel, m bool) { + c.Receive(ctx, &v) + }) + s.Select(ctx) + if v != expectedValue { + panic("callback for receiveCh is not executed") + } + return nil +} + +func (s *WorkflowUnitTest) Test_CloseChannelInSelectWorkflow() { + env := s.NewTestWorkflowEnvironment() + env.ExecuteWorkflow(closeChannelInSelectTest) + s.True(env.IsWorkflowCompleted()) + s.NoError(env.GetWorkflowError()) +} + func bufferedChanWorkflowTest(ctx Context, bufferSize int) error { bufferedCh := NewBufferedChannel(ctx, bufferSize)