From fff3968c93257acfc9fd65a9586a6eb53b75d9cc Mon Sep 17 00:00:00 2001 From: Robert Lyons Date: Sun, 11 Apr 2021 18:05:55 -0400 Subject: [PATCH] add kinesis serdes --- .../flinkrunner/FlinkRunnerFactory.scala | 40 +++++++++++++++---- 1 file changed, 32 insertions(+), 8 deletions(-) diff --git a/src/main/scala/io/epiphanous/flinkrunner/FlinkRunnerFactory.scala b/src/main/scala/io/epiphanous/flinkrunner/FlinkRunnerFactory.scala index 24afc4c..b141a36 100644 --- a/src/main/scala/io/epiphanous/flinkrunner/FlinkRunnerFactory.scala +++ b/src/main/scala/io/epiphanous/flinkrunner/FlinkRunnerFactory.scala @@ -3,9 +3,20 @@ package io.epiphanous.flinkrunner import io.epiphanous.flinkrunner.flink.BaseFlinkJob import io.epiphanous.flinkrunner.model._ import io.epiphanous.flinkrunner.operator.AddToJdbcBatchFunction -import org.apache.flink.api.common.serialization.{DeserializationSchema, Encoder, SerializationSchema} +import org.apache.flink.api.common.serialization.{ + DeserializationSchema, + Encoder, + SerializationSchema +} import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner -import org.apache.flink.streaming.connectors.kafka.{KafkaDeserializationSchema, KafkaSerializationSchema} +import org.apache.flink.streaming.connectors.kafka.{ + KafkaDeserializationSchema, + KafkaSerializationSchema +} +import org.apache.flink.streaming.connectors.kinesis.serialization.{ + KinesisDeserializationSchema, + KinesisSerializationSchema +} import java.util.Properties @@ -13,17 +24,30 @@ trait FlinkRunnerFactory[ADT <: FlinkEvent] { def getJobInstance(name: String): BaseFlinkJob[_, _ <: ADT] - def getDeserializationSchema(sourceConfig: SourceConfig): DeserializationSchema[ADT] = ??? + def getDeserializationSchema( + sourceConfig: SourceConfig): DeserializationSchema[ADT] = ??? + + def getKafkaDeserializationSchema( + sourceConfig: KafkaSourceConfig): KafkaDeserializationSchema[ADT] = + ??? + + def getKinesisDeserializationSchema(sourceConfig: KinesisSourceConfig) + : KinesisDeserializationSchema[ADT] = ??? - def getKafkaDeserializationSchema(sourceConfig: KafkaSourceConfig): KafkaDeserializationSchema[ADT] = ??? + def getSerializationSchema( + sinkConfig: SinkConfig): SerializationSchema[ADT] = ??? - def getSerializationSchema(sinkConfig: SinkConfig): SerializationSchema[ADT] = ??? + def getKafkaSerializationSchema( + sinkConfig: KafkaSinkConfig): KafkaSerializationSchema[ADT] = ??? - def getKafkaSerializationSchema(sinkConfig: KafkaSinkConfig): KafkaSerializationSchema[ADT] = ??? + def getKinesisSerializationSchema( + sinkConfig: KinesisSinkConfig): KinesisSerializationSchema[ADT] = ??? def getEncoder(sinkConfig: SinkConfig): Encoder[ADT] = ??? - def getAddToJdbcBatchFunction(sinkConfig: SinkConfig): AddToJdbcBatchFunction[ADT] = ??? + def getAddToJdbcBatchFunction( + sinkConfig: SinkConfig): AddToJdbcBatchFunction[ADT] = ??? - def getBucketAssigner(props: Properties): BucketAssigner[ADT, String] = ??? + def getBucketAssigner(props: Properties): BucketAssigner[ADT, String] = + ??? }