Skip to content

Commit 70e82fa

Browse files
kmbottsjamesls
authored andcommitted
Added MaximumBatchingWindowInSeconds aws#1778
1 parent ab764d4 commit 70e82fa

File tree

9 files changed

+148
-42
lines changed

9 files changed

+148
-42
lines changed

chalice/app.py

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -721,14 +721,16 @@ def on_sns_message(self, topic, name=None):
721721
)
722722

723723
def on_sqs_message(self, queue=None, batch_size=1,
724-
name=None, queue_arn=None):
724+
name=None, queue_arn=None,
725+
maximum_batching_window_in_seconds=0):
725726
return self._create_registration_function(
726727
handler_type='on_sqs_message',
727728
name=name,
728729
registration_kwargs={
729730
'queue': queue,
730731
'queue_arn': queue_arn,
731732
'batch_size': batch_size,
733+
'maximum_batching_window_in_seconds': maximum_batching_window_in_seconds
732734
}
733735
)
734736

@@ -748,23 +750,27 @@ def schedule(self, expression, name=None, description=''):
748750
)
749751

750752
def on_kinesis_record(self, stream, batch_size=100,
751-
starting_position='LATEST', name=None):
753+
starting_position='LATEST', name=None,
754+
maximum_batching_window_in_seconds=0):
752755
return self._create_registration_function(
753756
handler_type='on_kinesis_record',
754757
name=name,
755758
registration_kwargs={'stream': stream,
756759
'batch_size': batch_size,
757-
'starting_position': starting_position},
760+
'starting_position': starting_position,
761+
'maximum_batching_window_in_seconds': maximum_batching_window_in_seconds},
758762
)
759763

760764
def on_dynamodb_record(self, stream_arn, batch_size=100,
761-
starting_position='LATEST', name=None):
765+
starting_position='LATEST', name=None,
766+
maximum_batching_window_in_seconds=0):
762767
return self._create_registration_function(
763768
handler_type='on_dynamodb_record',
764769
name=name,
765770
registration_kwargs={'stream_arn': stream_arn,
766771
'batch_size': batch_size,
767-
'starting_position': starting_position},
772+
'starting_position': starting_position,
773+
'maximum_batching_window_in_seconds': maximum_batching_window_in_seconds},
768774
)
769775

770776
def route(self, path, **kwargs):
@@ -994,6 +1000,7 @@ def _register_on_sqs_message(self, name, handler_string, kwargs, **unused):
9941000
queue=queue,
9951001
queue_arn=queue_arn,
9961002
batch_size=kwargs['batch_size'],
1003+
maximum_batching_window_in_seconds=kwargs['maximum_batching_window_in_seconds'],
9971004
)
9981005
self.event_sources.append(sqs_config)
9991006

@@ -1005,6 +1012,7 @@ def _register_on_kinesis_record(self, name, handler_string,
10051012
stream=kwargs['stream'],
10061013
batch_size=kwargs['batch_size'],
10071014
starting_position=kwargs['starting_position'],
1015+
maximum_batching_window_in_seconds=kwargs['maximum_batching_window_in_seconds'],
10081016
)
10091017
self.event_sources.append(kinesis_config)
10101018

@@ -1016,6 +1024,7 @@ def _register_on_dynamodb_record(self, name, handler_string,
10161024
stream_arn=kwargs['stream_arn'],
10171025
batch_size=kwargs['batch_size'],
10181026
starting_position=kwargs['starting_position'],
1027+
maximum_batching_window_in_seconds=kwargs['maximum_batching_window_in_seconds'],
10191028
)
10201029
self.event_sources.append(ddb_config)
10211030

@@ -1460,29 +1469,35 @@ def __init__(self, name, handler_string, topic):
14601469

14611470

14621471
class SQSEventConfig(BaseEventSourceConfig):
1463-
def __init__(self, name, handler_string, queue, queue_arn, batch_size):
1472+
def __init__(self, name, handler_string, queue, queue_arn, batch_size,
1473+
maximum_batching_window_in_seconds):
14641474
super(SQSEventConfig, self).__init__(name, handler_string)
14651475
self.queue = queue
14661476
self.queue_arn = queue_arn
14671477
self.batch_size = batch_size
1478+
self.maximum_batching_window_in_seconds = maximum_batching_window_in_seconds
14681479

14691480

14701481
class KinesisEventConfig(BaseEventSourceConfig):
14711482
def __init__(self, name, handler_string, stream,
1472-
batch_size, starting_position):
1483+
batch_size, starting_position,
1484+
maximum_batching_window_in_seconds):
14731485
super(KinesisEventConfig, self).__init__(name, handler_string)
14741486
self.stream = stream
14751487
self.batch_size = batch_size
14761488
self.starting_position = starting_position
1489+
self.maximum_batching_window_in_seconds = maximum_batching_window_in_seconds
14771490

14781491

14791492
class DynamoDBEventConfig(BaseEventSourceConfig):
14801493
def __init__(self, name, handler_string, stream_arn,
1481-
batch_size, starting_position):
1494+
batch_size, starting_position,
1495+
maximum_batching_window_in_seconds):
14821496
super(DynamoDBEventConfig, self).__init__(name, handler_string)
14831497
self.stream_arn = stream_arn
14841498
self.batch_size = batch_size
14851499
self.starting_position = starting_position
1500+
self.maximum_batching_window_in_seconds = maximum_batching_window_in_seconds
14861501

14871502

14881503
class WebsocketConnectConfig(BaseEventSourceConfig):

chalice/app.pyi

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -354,13 +354,14 @@ class SNSEventConfig(BaseEventSourceConfig):
354354

355355

356356
class SQSEventConfig(BaseEventSourceConfig):
357-
queue = ... # type: Optional[str]
358-
queue_arn = ... # type: Optional[str]
359-
batch_size = ... # type: int
357+
queue = ... # type: Optional[str]
358+
queue_arn = ... # type: Optional[str]
359+
batch_size = ... # type: int
360+
maximum_batching_window_in_seconds = ... # type: int
360361

361362
def __init__(
362363
self, name: str, handler_string: str, queue: Optional[str],
363-
queue_arn: Optional[str], batch_size: int
364+
queue_arn: Optional[str], batch_size: int, maximum_batching_window_in_seconds: int,
364365
) -> None: ...
365366

366367

@@ -374,15 +375,17 @@ class CloudWatchEventConfig(BaseEventSourceConfig):
374375

375376

376377
class KinesisEventConfig(BaseEventSourceConfig):
377-
stream = ... # type: str
378-
batch_size = ... # type: int
379-
starting_position = ... # type: str
378+
stream = ... # type: str
379+
batch_size = ... # type: int
380+
starting_position = ... # type: str
381+
maximum_batching_window_in_seconds = ... # type: int
380382

381383

382384
class DynamoDBEventConfig(BaseEventSourceConfig):
383-
stream_arn = ... # type: str
384-
batch_size = ... # type: int
385-
starting_position = ... # type: str
385+
stream_arn = ... # type: str
386+
batch_size = ... # type: int
387+
starting_position = ... # type: str
388+
maximum_batching_window_in_seconds = ... # type: int
386389

387390

388391
class Blueprint(DecoratorAPI):

chalice/deploy/appgraph.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -570,6 +570,7 @@ def _create_sqs_subscription(
570570
queue=queue,
571571
batch_size=sqs_config.batch_size,
572572
lambda_function=lambda_function,
573+
maximum_batching_window_in_seconds=sqs_config.maximum_batching_window_in_seconds,
573574
)
574575
return sqs_event_source
575576

@@ -590,6 +591,7 @@ def _create_kinesis_subscription(
590591
resource_name=resource_name,
591592
stream=kinesis_config.stream,
592593
batch_size=kinesis_config.batch_size,
594+
maximum_batching_window_in_seconds=kinesis_config.maximum_batching_window_in_seconds,
593595
starting_position=kinesis_config.starting_position,
594596
lambda_function=lambda_function,
595597
)
@@ -612,6 +614,7 @@ def _create_ddb_subscription(
612614
resource_name=resource_name,
613615
stream_arn=ddb_config.stream_arn,
614616
batch_size=ddb_config.batch_size,
617+
maximum_batching_window_in_seconds=ddb_config.maximum_batching_window_in_seconds,
615618
starting_position=ddb_config.starting_position,
616619
lambda_function=lambda_function,
617620
)

chalice/deploy/models.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -344,21 +344,24 @@ def queue_name(self):
344344
@attrs
345345
class SQSEventSource(FunctionEventSubscriber):
346346
resource_type = 'sqs_event'
347-
queue = attrib() # type: Union[str, QueueARN]
348-
batch_size = attrib() # type: int
347+
queue = attrib() # type: Union[str, QueueARN]
348+
batch_size = attrib() # type: int
349+
maximum_batching_window_in_seconds = attrib() # type: int
349350

350351

351352
@attrs
352353
class KinesisEventSource(FunctionEventSubscriber):
353354
resource_type = 'kinesis_event'
354-
stream = attrib() # type: str
355-
batch_size = attrib() # type: int
356-
starting_position = attrib() # type: str
355+
stream = attrib() # type: str
356+
batch_size = attrib() # type: int
357+
starting_position = attrib() # type: str
358+
maximum_batching_window_in_seconds = attrib() # type: int
357359

358360

359361
@attrs
360362
class DynamoDBEventSource(FunctionEventSubscriber):
361363
resource_type = 'dynamodb_event'
362-
stream_arn = attrib() # type: str
363-
batch_size = attrib() # type: int
364-
starting_position = attrib() # type: str
364+
stream_arn = attrib() # type: str
365+
batch_size = attrib() # type: int
366+
starting_position = attrib() # type: str
367+
maximum_batching_window_in_seconds = attrib() # type: int

chalice/deploy/planner.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -728,7 +728,8 @@ def _plan_sqseventsource(self, resource):
728728
models.APICall(
729729
method_name='update_lambda_event_source',
730730
params={'event_uuid': uuid,
731-
'batch_size': resource.batch_size}
731+
'batch_size': resource.batch_size,
732+
'maximum_batching_window_in_seconds': resource.maximum_batching_window_in_seconds}
732733
)
733734
] + self._batch_record_resource(
734735
'sqs_event', resource.resource_name, {
@@ -743,6 +744,7 @@ def _plan_sqseventsource(self, resource):
743744
method_name='create_lambda_event_source',
744745
params={'event_source_arn': Variable(queue_arn_varname),
745746
'batch_size': resource.batch_size,
747+
'maximum_batching_window_in_seconds': resource.maximum_batching_window_in_seconds,
746748
'function_name': function_arn},
747749
output_var=uuid_varname,
748750
), 'Subscribing %s to SQS queue %s\n'
@@ -782,7 +784,8 @@ def _plan_kinesiseventsource(self, resource):
782784
models.APICall(
783785
method_name='update_lambda_event_source',
784786
params={'event_uuid': uuid,
785-
'batch_size': resource.batch_size}
787+
'batch_size': resource.batch_size,
788+
'maximum_batching_window_in_seconds': resource.maximum_batching_window_in_seconds}
786789
)
787790
] + self._batch_record_resource(
788791
'kinesis_event', resource.resource_name, {
@@ -798,7 +801,8 @@ def _plan_kinesiseventsource(self, resource):
798801
params={'event_source_arn': Variable(stream_arn_varname),
799802
'batch_size': resource.batch_size,
800803
'function_name': function_arn,
801-
'starting_position': resource.starting_position},
804+
'starting_position': resource.starting_position,
805+
'maximum_batching_window_in_seconds': resource.maximum_batching_window_in_seconds},
802806
output_var=uuid_varname,
803807
), 'Subscribing %s to Kinesis stream %s\n'
804808
% (resource.lambda_function.function_name, resource.stream)
@@ -826,7 +830,8 @@ def _plan_dynamodbeventsource(self, resource):
826830
models.APICall(
827831
method_name='update_lambda_event_source',
828832
params={'event_uuid': uuid,
829-
'batch_size': resource.batch_size}
833+
'batch_size': resource.batch_size,
834+
'maximum_batching_window_in_seconds': resource.maximum_batching_window_in_seconds}
830835
)
831836
] + self._batch_record_resource(
832837
'dynamodb_event', resource.resource_name, {
@@ -841,7 +846,8 @@ def _plan_dynamodbeventsource(self, resource):
841846
params={'event_source_arn': resource.stream_arn,
842847
'batch_size': resource.batch_size,
843848
'function_name': function_arn,
844-
'starting_position': resource.starting_position},
849+
'starting_position': resource.starting_position,
850+
'maximum_batching_window_in_seconds': resource.maximum_batching_window_in_seconds},
845851
output_var=uuid_varname,
846852
), 'Subscribing %s to DynamoDB stream %s\n'
847853
% (resource.lambda_function.function_name,

chalice/package.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -645,6 +645,7 @@ def _generate_sqseventsource(self, resource, template):
645645
'Properties': {
646646
'Queue': queue,
647647
'BatchSize': resource.batch_size,
648+
'MaximumBatchingWindowInSeconds': resource.maximum_batching_window_in_seconds,
648649
}
649650
}
650651
}
@@ -666,6 +667,7 @@ def _generate_kinesiseventsource(self, resource, template):
666667
},
667668
'BatchSize': resource.batch_size,
668669
'StartingPosition': resource.starting_position,
670+
'MaximumBatchingWindowInSeconds': resource.maximum_batching_window_in_seconds,
669671
}
670672
function_cfn['Properties']['Events'] = {
671673
kinesis_cfn_name: {
@@ -685,6 +687,7 @@ def _generate_dynamodbeventsource(self, resource, template):
685687
'Stream': resource.stream_arn,
686688
'BatchSize': resource.batch_size,
687689
'StartingPosition': resource.starting_position,
690+
'MaximumBatchingWindowInSeconds': resource.maximum_batching_window_in_seconds,
688691
}
689692
function_cfn['Properties']['Events'] = {
690693
ddb_cfn_name: {
@@ -916,6 +919,7 @@ def _generate_sqseventsource(self, resource, template):
916919
resource.resource_name] = {
917920
'event_source_arn': event_source_arn,
918921
'batch_size': resource.batch_size,
922+
'maximum_batching_window_in_seconds': resource.maximum_batching_window_in_seconds,
919923
'function_name': self._fref(resource.lambda_function)
920924
}
921925

@@ -929,6 +933,7 @@ def _generate_kinesiseventsource(self, resource, template):
929933
stream=resource.stream),
930934
'batch_size': resource.batch_size,
931935
'starting_position': resource.starting_position,
936+
'maximum_batching_window_in_seconds': resource.maximum_batching_window_in_seconds,
932937
'function_name': self._fref(resource.lambda_function)
933938
}
934939

@@ -939,6 +944,7 @@ def _generate_dynamodbeventsource(self, resource, template):
939944
'event_source_arn': resource.stream_arn,
940945
'batch_size': resource.batch_size,
941946
'starting_position': resource.starting_position,
947+
'maximum_batching_window_in_seconds': resource.maximum_batching_window_in_seconds,
942948
'function_name': self._fref(resource.lambda_function),
943949
}
944950

0 commit comments

Comments
 (0)