Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Either Create or DirectRunner fails to produce all elements to the following transform #21257

Closed
damccorm opened this issue Jun 4, 2022 · 2 comments
Assignees
Labels
bug direct done & done Issue has been reviewed after it was closed for verification, followups, etc. java P1 runners

Comments

@damccorm
Copy link
Contributor

damccorm commented Jun 4, 2022

The following pipeline fails to print out all numbers 1 to 100 when run on DirectRunner in streaming mode.

This was identified implementing org.apache.beam.sdk.io.gcp.pubsublite.ReadWriteIT, which uses a workaround for the bug in either Create or DirectRunner:


private static final int COUNT = 100;

private static AtomicInteger CREATED_COUNT = new AtomicInteger();

public
static void run(Pipeline pipeline) {
  PCollection<Integer> indexes = pipeline.apply( "createIndexes",

     Create.of(IntStream.range(0, COUNT).boxed().collect(Collectors.toList())));

indexes.apply(

"createMessages",
 MapElements.via(
     new SimpleFunction<Integer, Integer>(
         index ->
{
           System.err.println("Created message index " + createdCount.incrementAndGet());
     
     return index;
         }) {}));

  pipeline.run().waitUntilFinish();  // Never terminates
}

Imported from Jira BEAM-12867. Original Jira may contain additional context.
Reported by: dpcollins-google.

@kennknowles
Copy link
Member

@dpcollins-google per my prior comment, can you confirm with a PAssert whether there is a lack of data processing actually occurring? Or, you may take the position that all side effects of a DoFn should have occurred before the pipeline shuts down. All in all, I am not convinced this is P1 but it could be a real bug.

@damccorm damccorm added the java label Oct 5, 2022
@johnjcasey johnjcasey self-assigned this Oct 19, 2022
@johnjcasey
Copy link
Contributor

I attempted to reproduce this:

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
public class SimpleStreamingTest {

  @Rule public ExpectedLogs expectedLogs = ExpectedLogs.none(SimpleStreamingTest.class);

  private static final Logger LOG = LoggerFactory.getLogger(SimpleStreamingTest.class);

  private static final int COUNT = 100;

  @Parameterized.Parameters
  public static Object[][] data() {
    return new Object[100][0];
  }

  @Test
  public void testDoFnSideEffects(){
    PipelineOptions options = PipelineOptionsFactory.create();
    options.as(StreamingOptions.class).setStreaming(true);
    Pipeline testPipeline = Pipeline.create(options);
    List<Integer> indexes = IntStream.range(0, COUNT).boxed().collect(Collectors.toList());
    PCollection<Integer> values = testPipeline.apply( "createIndexes", Create.of(indexes))
        .apply("createMessages", ParDo.of(new Counter()));

    PAssert.that(values).containsInAnyOrder(indexes);

    testPipeline.run().waitUntilFinish();

    for(int i : indexes){
      expectedLogs.verifyError(String.valueOf(i));
    }
  }

  private static class Counter extends DoFn<Integer,Integer>{

    @ProcessElement
    public void processElement(@Element Integer i, OutputReceiver<Integer> receiver){
      LOG.error(String.valueOf(i));
      receiver.output(i);
    }
  }
}

Running the above 100 times did not result in any failures, so I believe this issue is not current.

@github-actions github-actions bot added this to the 2.43.0 Release milestone Oct 19, 2022
@apilloud apilloud added the done & done Issue has been reviewed after it was closed for verification, followups, etc. label Oct 25, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug direct done & done Issue has been reviewed after it was closed for verification, followups, etc. java P1 runners
Projects
None yet
Development

No branches or pull requests

4 participants