Skip to content

Commit

Permalink
add kinesis serdes
Browse files Browse the repository at this point in the history
  • Loading branch information
nextdude committed Apr 11, 2021
1 parent 822123f commit fff3968
Showing 1 changed file with 32 additions and 8 deletions.
40 changes: 32 additions & 8 deletions src/main/scala/io/epiphanous/flinkrunner/FlinkRunnerFactory.scala
Expand Up @@ -3,27 +3,51 @@ package io.epiphanous.flinkrunner
import io.epiphanous.flinkrunner.flink.BaseFlinkJob
import io.epiphanous.flinkrunner.model._
import io.epiphanous.flinkrunner.operator.AddToJdbcBatchFunction
import org.apache.flink.api.common.serialization.{DeserializationSchema, Encoder, SerializationSchema}
import org.apache.flink.api.common.serialization.{
DeserializationSchema,
Encoder,
SerializationSchema
}
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner
import org.apache.flink.streaming.connectors.kafka.{KafkaDeserializationSchema, KafkaSerializationSchema}
import org.apache.flink.streaming.connectors.kafka.{
KafkaDeserializationSchema,
KafkaSerializationSchema
}
import org.apache.flink.streaming.connectors.kinesis.serialization.{
KinesisDeserializationSchema,
KinesisSerializationSchema
}

import java.util.Properties

trait FlinkRunnerFactory[ADT <: FlinkEvent] {

def getJobInstance(name: String): BaseFlinkJob[_, _ <: ADT]

def getDeserializationSchema(sourceConfig: SourceConfig): DeserializationSchema[ADT] = ???
def getDeserializationSchema(
sourceConfig: SourceConfig): DeserializationSchema[ADT] = ???

def getKafkaDeserializationSchema(
sourceConfig: KafkaSourceConfig): KafkaDeserializationSchema[ADT] =
???

def getKinesisDeserializationSchema(sourceConfig: KinesisSourceConfig)
: KinesisDeserializationSchema[ADT] = ???

def getKafkaDeserializationSchema(sourceConfig: KafkaSourceConfig): KafkaDeserializationSchema[ADT] = ???
def getSerializationSchema(
sinkConfig: SinkConfig): SerializationSchema[ADT] = ???

def getSerializationSchema(sinkConfig: SinkConfig): SerializationSchema[ADT] = ???
def getKafkaSerializationSchema(
sinkConfig: KafkaSinkConfig): KafkaSerializationSchema[ADT] = ???

def getKafkaSerializationSchema(sinkConfig: KafkaSinkConfig): KafkaSerializationSchema[ADT] = ???
def getKinesisSerializationSchema(
sinkConfig: KinesisSinkConfig): KinesisSerializationSchema[ADT] = ???

def getEncoder(sinkConfig: SinkConfig): Encoder[ADT] = ???

def getAddToJdbcBatchFunction(sinkConfig: SinkConfig): AddToJdbcBatchFunction[ADT] = ???
def getAddToJdbcBatchFunction(
sinkConfig: SinkConfig): AddToJdbcBatchFunction[ADT] = ???

def getBucketAssigner(props: Properties): BucketAssigner[ADT, String] = ???
def getBucketAssigner(props: Properties): BucketAssigner[ADT, String] =
???
}

0 comments on commit fff3968

Please sign in to comment.