Skip to content

Commit

Permalink
Adding parallelism to sink
Browse files Browse the repository at this point in the history
  • Loading branch information
LizaMo committed Feb 24, 2023
1 parent 22966be commit 0f15d71
Show file tree
Hide file tree
Showing 11 changed files with 21 additions and 8 deletions.
Expand Up @@ -62,6 +62,7 @@ case class CassandraSinkConfig[ADT <: FlinkEvent](
.addSink(new CassandraScalaProductSink[E](query, clusterBuilder))
.uid(label)
.name(label)
.setParallelism(parallelism)
}

override def getAvroSink[
Expand All @@ -82,5 +83,6 @@ case class CassandraSinkConfig[ADT <: FlinkEvent](
)
.uid(label)
.name(label)
.setParallelism(parallelism)

}
Expand Up @@ -94,7 +94,7 @@ case class ElasticsearchSinkConfig[ADT <: FlinkEvent](
bulkFlushMaxActions.foreach(esb.setBulkFlushMaxActions)
bulkFlushMaxSizeMb.foreach(esb.setBulkFlushMaxSizeMb)
bulkFlushIntervalMs.foreach(esb.setBulkFlushInterval)
dataStream.sinkTo(esb.build()).uid(label).name(label)
dataStream.sinkTo(esb.build()).uid(label).name(label).setParallelism(parallelism)
}

override def getSink[E <: ADT: TypeInformation](
Expand Down
Expand Up @@ -119,7 +119,7 @@ case class FileSinkConfig[ADT <: FlinkEvent](
.withRollingPolicy(getCheckpointRollingPolicy)
.withOutputFileConfig(getOutputFileConfig)
.build()
)
).setParallelism(parallelism)

/** Create an bulk avro parquet file sink and send the data stream to it.
* @param dataStream
Expand Down Expand Up @@ -165,7 +165,7 @@ case class FileSinkConfig[ADT <: FlinkEvent](
s"Invalid format for getAvroSink: $format"
)
}
dataStream.sinkTo(sink)
dataStream.sinkTo(sink).setParallelism(parallelism)
}

def getBucketCheckInterval: Long =
Expand Down
Expand Up @@ -99,7 +99,7 @@ case class FirehoseSinkConfig[ADT <: FlinkEvent: TypeInformation](
.map(k.setMaxBatchSizeInBytes)
.getOrElse(k)
}.build()
dataStream.sinkTo(kfs)
dataStream.sinkTo(kfs).setParallelism(parallelism)
}

def getSerializationSchema[E <: ADT: TypeInformation]
Expand Down
Expand Up @@ -686,6 +686,7 @@ case class JdbcSinkConfig[ADT <: FlinkEvent](
)
.uid(label)
.name(label)
.setParallelism(parallelism)
}

override def getSink[E <: ADT: TypeInformation](
Expand Down
Expand Up @@ -139,11 +139,11 @@ case class KafkaSinkConfig[ADT <: FlinkEvent: TypeInformation](
E <: ADT with EmbeddedAvroRecord[A]: TypeInformation,
A <: GenericRecord: TypeInformation](
dataStream: DataStream[E]): DataStreamSink[E] =
dataStream.sinkTo(_getSink[E](getAvroSerializationSchema[E, A]))
dataStream.sinkTo(_getSink[E](getAvroSerializationSchema[E, A])).setParallelism(parallelism)

override def getSink[E <: ADT: TypeInformation](
dataStream: DataStream[E]): DataStreamSink[E] =
dataStream.sinkTo(_getSink[E](getSerializationSchema[E]))
dataStream.sinkTo(_getSink[E](getSerializationSchema[E])).setParallelism(parallelism)

def _getSink[E <: ADT: TypeInformation](
serializer: KafkaRecordSerializationSchema[E]): KafkaSink[E] =
Expand Down
Expand Up @@ -79,6 +79,7 @@ case class KinesisSinkConfig[ADT <: FlinkEvent: TypeInformation](
.sinkTo(ks)
.uid(label)
.name(label)
.setParallelism(parallelism)
}

override def getSink[E <: ADT: TypeInformation](
Expand Down
Expand Up @@ -58,7 +58,7 @@ case class RabbitMQSinkConfig[ADT <: FlinkEvent: TypeInformation](
)
}
}
dataStream.addSink(sink).uid(label).name(label)
dataStream.addSink(sink).uid(label).name(label).setParallelism(parallelism)
}

override def getSink[E <: ADT: TypeInformation](
Expand Down
Expand Up @@ -50,6 +50,10 @@ trait SinkConfig[ADT <: FlinkEvent] extends LazyLogging {

lazy val label: String = s"${connector.entryName.toLowerCase}/$name"

lazy val parallelism: Int = config
.getIntOpt(pfx("parallelism"))
.getOrElse(config.globalParallelism)

def getSink[E <: ADT: TypeInformation](
dataStream: DataStream[E]): DataStreamSink[E]

Expand Down
Expand Up @@ -84,5 +84,5 @@ case class SocketSinkConfig[ADT <: FlinkEvent](
maxRetries,
autoFlush
)
)
).setParallelism(parallelism)
}
Expand Up @@ -94,6 +94,11 @@ class KinesisSourceConfigSpec extends PropSpec {
defaultConfigPlus("efo.enabled = false").useEfo shouldBe false
}

property("parallelism property") {
defaultConfigPlus("parallelism = 10").parallelism shouldBe 10
defaultConfigPlus("parallelism = 10.5").parallelism shouldBe 10
}

property("missing stream property") {
the[Exception] thrownBy noProvidedConfig should have message "kinesis source kinesis-test is missing required 'stream' property"
}
Expand Down

0 comments on commit 0f15d71

Please sign in to comment.