Skip to content

Inputs SQS with Session based Windowing doesn't work #19406

@kennknowles

Description

@kennknowles

Hi,

 

Trying to use Beam with AWS SQS service as an input source, using Session windows.

The windows aren't executed. Code works well when the input source is Kafka:


// code placeholder
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm");

PipelineOptions
options = PipelineOptionsFactory.create();
AwsOptions awsOptions = options.as(AwsOptions.class);
BasicAWSCredentials
awsCreds = new BasicAWSCredentials("", "");
awsOptions.setAwsCredentialsProvider(new AWSStaticCredentialsProvider(awsCreds));
awsOptions.setAwsRegion("eu-west-1");

Pipeline
p = Pipeline.create(options);
// This example reads a public data set consisting of the complete works
of Shakespeare.
p.apply(SqsIO.read().withQueueUrl("https://sqs.eu-west-1.amazonaws.com/XXXXXXXXXXXXXXXX"))


/*Per
session windows*/
.apply(ParDo.of(new DoFn<Message, String>() {
@ProcessElement
public void processElement(@Element
Message element, OutputReceiver<String> out) {
// Extract the timestamp from log entry we're currently
processing.

LOG.info("Message Body: {}", element.getBody());
out.output(element.getBody());
}
}))

//Set
windowing configuration




.apply(
"WindowIntoSessions",
Window.<String>into(
Sessions.withGapDuration(Duration.standardSeconds(5)))
.withTimestampCombiner(TimestampCombiner.END_OF_WINDOW)
//.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
//
Late data is dropped
.accumulatingFiredPanes()
.withAllowedLateness(Duration.ZERO))

//Extract and
count: Extracts a the object to an KV store of <userID, count>
.apply(
MapElements.into( TypeDescriptors.kvs(TypeDescriptors.strings(),
TypeDescriptors.integers()))
.via(
(String testO) -> KV.of(testO, new Integer(1))
)
)

.apply("CountElements",
Sum.integersPerKey())

.apply("Log", ParDo.of(new FilterTextFn()))

.apply(
MapElements.into(TypeDescriptors.strings())
.via(
(KV<String,
Integer> wordCount) ->
wordCount.getKey() + ": " + wordCount.getValue()))
;

p.run().waitUntilFinish();
}


 

Imported from Jira BEAM-7498. Original Jira may contain additional context.
Reported by: esteveavi.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions