Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-16738] [py] Add kinesis_egress_record builder method to Python SDK #69

Merged
merged 2 commits into from
Mar 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion statefun-python-sdk/statefun/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@
from statefun.core import StatefulFunctions
from statefun.request_reply import RequestReplyHandler

from statefun.core import kafka_egress_record
from statefun.core import kafka_egress_record
from statefun.core import kinesis_egress_record
32 changes: 28 additions & 4 deletions statefun-python-sdk/statefun/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import inspect

from statefun.kafka_egress_pb2 import KafkaProducerRecord

from statefun.kinesis_egress_pb2 import KinesisEgressRecord

class SdkAddress(object):
def __init__(self, namespace, type, identity):
Expand Down Expand Up @@ -191,12 +191,12 @@ def for_type(self, namespace, type):

def kafka_egress_record(topic: str, value, key: str = None):
"""
Build a ProtobufMessage that can be emitted to a Protobuf based egress.
Build a ProtobufMessage that can be emitted to a Kafka generic egress.

:param topic: The kafka detention topic for that record
:param topic: The Kafka destination topic for that record
:param key: the utf8 encoded string key to produce (can be empty)
:param value: the Protobuf value to produce
:return: A Protobuf message represents the record to be produced via the kafka procurer.
:return: A Protobuf message representing the record to be produced via the Kafka generic egress.
"""
if not topic:
raise ValueError("A destination Kafka topic is missing")
Expand All @@ -208,3 +208,27 @@ def kafka_egress_record(topic: str, value, key: str = None):
if key is not None:
record.key = key
return record

def kinesis_egress_record(stream: str, value, partition_key: str, explicit_hash_key: str = None):
"""
Build a ProtobufMessage that can be emitted to a Kinesis generic egress.

:param stream: The AWS Kinesis destination stream for that record
:param partition_key: the utf8 encoded string partition key to use
:param value: the Protobuf value to produce
:param explicit_hash_key: a utf8 encoded string explicit hash key to use (can be empty)
:return: A Protobuf message representing the record to be produced to AWS Kinesis via the Kinesis generic egress.
"""
if not stream:
raise ValueError("Missing destination Kinesis stream")
if not value:
raise ValueError("Missing value")
if not partition_key:
raise ValueError("Missung partition key")
record = KinesisEgressRecord()
record.stream = stream
record.value_bytes = value.SerializeToString()
record.partition_key = partition_key
if explicit_hash_key is not None:
record.explicit_hash_key = explicit_hash_key
return record
107 changes: 107 additions & 0 deletions statefun-python-sdk/statefun/kinesis_egress_pb2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# -*- coding: utf-8 -*-
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: kinesis-egress.proto

from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)

_sym_db = _symbol_database.Default()




DESCRIPTOR = _descriptor.FileDescriptor(
name='kinesis-egress.proto',
package='org.apache.flink.statefun.flink.io',
syntax='proto3',
serialized_options=b'\n,org.apache.flink.statefun.flink.io.generatedP\001',
serialized_pb=b'\n\x14kinesis-egress.proto\x12\"org.apache.flink.statefun.flink.io\"l\n\x13KinesisEgressRecord\x12\x15\n\rpartition_key\x18\x01 \x01(\t\x12\x13\n\x0bvalue_bytes\x18\x02 \x01(\x0c\x12\x0e\n\x06stream\x18\x03 \x01(\t\x12\x19\n\x11\x65xplicit_hash_key\x18\x04 \x01(\tB0\n,org.apache.flink.statefun.flink.io.generatedP\x01\x62\x06proto3'
)




_KINESISEGRESSRECORD = _descriptor.Descriptor(
name='KinesisEgressRecord',
full_name='org.apache.flink.statefun.flink.io.KinesisEgressRecord',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='partition_key', full_name='org.apache.flink.statefun.flink.io.KinesisEgressRecord.partition_key', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='value_bytes', full_name='org.apache.flink.statefun.flink.io.KinesisEgressRecord.value_bytes', index=1,
number=2, type=12, cpp_type=9, label=1,
has_default_value=False, default_value=b"",
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='stream', full_name='org.apache.flink.statefun.flink.io.KinesisEgressRecord.stream', index=2,
number=3, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='explicit_hash_key', full_name='org.apache.flink.statefun.flink.io.KinesisEgressRecord.explicit_hash_key', index=3,
number=4, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=b"".decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=60,
serialized_end=168,
)

DESCRIPTOR.message_types_by_name['KinesisEgressRecord'] = _KINESISEGRESSRECORD
_sym_db.RegisterFileDescriptor(DESCRIPTOR)

KinesisEgressRecord = _reflection.GeneratedProtocolMessageType('KinesisEgressRecord', (_message.Message,), {
'DESCRIPTOR' : _KINESISEGRESSRECORD,
'__module__' : 'kinesis_egress_pb2'
# @@protoc_insertion_point(class_scope:org.apache.flink.statefun.flink.io.KinesisEgressRecord)
})
_sym_db.RegisterMessage(KinesisEgressRecord)


DESCRIPTOR._options = None
# @@protoc_insertion_point(module_scope)
14 changes: 14 additions & 0 deletions statefun-python-sdk/tests/request_reply_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from statefun.request_reply_pb2 import ToFunction, FromFunction
from statefun import RequestReplyHandler
from statefun.core import StatefulFunctions, kafka_egress_record
from statefun.core import StatefulFunctions, kinesis_egress_record


class InvocationBuilder(object):
Expand Down Expand Up @@ -130,6 +131,19 @@ def fun(context, message):
context.pack_and_send_egress("sdk/kafka",
kafka_egress_record(topic="hello", value=seen))

# AWS Kinesis generic egress
context.pack_and_send_egress("sdk/kinesis",
kinesis_egress_record(
stream="hello",
partition_key=u"hello world",
value=seen,
explicit_hash_key=u"1234"))
context.pack_and_send_egress("sdk/kinesis",
kinesis_egress_record(
stream="hello",
partition_key=u"hello world",
value=seen))

#
# build the invocation
#
Expand Down