Skip to content

Commit

Permalink
Flink 1.16 (#3683)
Browse files Browse the repository at this point in the history
* Flink 1.16

* Kafka upgrade/cleanup

* Publish flink-scala-utils

* Deps updates

* StreamEnvPreparer adjustments

* Revert "Kafka upgrade/cleanup"

This reverts commit 8189570.

* workaround for bytecode changes
  • Loading branch information
mproch committed Nov 22, 2022
1 parent a4f901a commit 7de5cae
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 13 deletions.
7 changes: 3 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,6 @@ lazy val commonSettings =
"com.typesafe.akka" %% "akka-stream" % akkaV,
"com.typesafe.akka" %% "akka-testkit" % akkaV,

// akka-actor depends on old, 0.8 version
"org.scala-lang.modules" %% "scala-java8-compat" % scalaCompatV,

//security features
Expand All @@ -253,7 +252,7 @@ lazy val commonSettings =
)
)

val flinkV = "1.15.2"
val flinkV = "1.16.0"
val avroV = "1.11.0"
//we should use max(version used by confluent, version acceptable by flink), https://docs.confluent.io/platform/current/installation/versions-interoperability.html - confluent version reference
//TODO: upgrade to 3.x after flink upgrade: flink up to version 1.15 doesn't accept kafka 3.x because org.apache.kafka.common.Metric.value was renamed to metricValue - it should be changed in flink 1.16
Expand All @@ -274,7 +273,7 @@ val scalaParsersV = "1.0.4"
val everitSchemaV = "1.14.1"
val slf4jV = "1.7.30"
val scalaLoggingV = "3.9.2"
val scalaCompatV = "0.9.1"
val scalaCompatV = "1.0.2"
val ficusV = "1.4.7"
val configV = "1.4.1"
val commonsLangV = "3.3.2"
Expand Down Expand Up @@ -1516,7 +1515,7 @@ lazy val bom = (project in file("bom"))
lazy val modules = List[ProjectReference](
requestResponseRuntime, liteEngineRuntimeApp, flinkDeploymentManager, flinkPeriodicDeploymentManager, flinkDevModel, flinkDevModelJava, defaultModel,
openapiComponents, interpreter, benchmarks, kafkaUtils, kafkaComponentsUtils, kafkaTestUtils, componentsUtils, componentsTestkit, defaultHelpers, commonUtils, utilsInternal, testUtils,
flinkExecutor, flinkSchemedKafkaComponentsUtils, flinkKafkaComponentsUtils, flinkComponentsUtils, flinkTests, flinkTestUtils, flinkComponentsApi, flinkExtensionsApi,
flinkExecutor, flinkSchemedKafkaComponentsUtils, flinkKafkaComponentsUtils, flinkComponentsUtils, flinkTests, flinkTestUtils, flinkComponentsApi, flinkExtensionsApi, flinkScalaUtils,
requestResponseComponentsUtils, requestResponseComponentsApi, componentsApi, extensionsApi, security, processReports, httpUtils,
restmodel, listenerApi, deploymentManagerApi, designer, sqlComponents, schemedKafkaComponentsUtils, flinkBaseComponents, flinkKafkaComponents,
liteComponentsApi, liteEngineKafkaComponentsApi, liteEngineRuntime, liteBaseComponents, liteKafkaComponents, liteKafkaComponentsTests, liteEngineKafkaRuntime, liteEngineKafkaIntegrationTest, liteEmbeddedDeploymentManager, liteK8sDeploymentManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,15 @@ class FlinkProcessRegistrar(compileProcess: (CanonicalProcess, ProcessVersion, D
val typeInformationForTi = InterpretationResultTypeInformation.create(typeInformationDetection, part.contextBefore)
val typeInformationForVC = typeInformationDetection.forContext(part.contextBefore)

registerSubsequentPart(start.getSideOutput(new OutputTag[InterpretationResult](part.id, typeInformationForTi))
registerSubsequentPart(sideOutput(start, new OutputTag[InterpretationResult](part.id, typeInformationForTi))
.map((value: InterpretationResult) => value.finalContext, typeInformationForVC), part)
}.foldLeft(Map[BranchEndDefinition, BranchEndData]()) {
_ ++ _
}
val branchForEnds = part.ends.collect {
case TypedEnd(be: BranchEnd, validationContext) =>
val ti = InterpretationResultTypeInformation.create(typeInformationDetection, validationContext)
be.definition -> BranchEndData(validationContext, start.getSideOutput(new OutputTag[InterpretationResult](be.nodeId, ti)))
be.definition -> BranchEndData(validationContext, sideOutput(start, new OutputTag[InterpretationResult](be.nodeId, ti)))
}.toMap
branchesForParts ++ branchForEnds
}
Expand All @@ -193,8 +193,8 @@ class FlinkProcessRegistrar(compileProcess: (CanonicalProcess, ProcessVersion, D
val typeInformationForIR = InterpretationResultTypeInformation.create(typeInformationDetection, contextBefore)
val typeInformationForCtx = typeInformationDetection.forContext(contextBefore)
// TODO: for sinks there are no further nodes to interpret but the function is registered to invoke listeners (e.g. to measure end metrics).
val afterInterpretation = registerInterpretationPart(start, part, SinkInterpretationName)
.getSideOutput(new OutputTag[InterpretationResult](FlinkProcessRegistrar.EndId, typeInformationForIR))
val afterInterpretation = sideOutput(registerInterpretationPart(start, part, SinkInterpretationName),
new OutputTag[InterpretationResult](FlinkProcessRegistrar.EndId, typeInformationForIR))
.map((value: InterpretationResult) => value.finalContext, typeInformationForCtx)
val customNodeContext = nodeContext(nodeComponentInfoFrom(part), Left(contextBefore))
val withValuePrepared = sink.prepareValue(afterInterpretation, customNodeContext)
Expand Down Expand Up @@ -270,6 +270,9 @@ class FlinkProcessRegistrar(compileProcess: (CanonicalProcess, ProcessVersion, D

}

private def sideOutput[T](stream: SingleOutputStreamOperator[_], tag: OutputTag[T])
= streamExecutionEnvPreparer.sideOutputGetter(stream, tag)

private def nodeComponentInfoFrom(processPart: ProcessPart): NodeComponentInfo = {
fromNodeData(processPart.node.data)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package pl.touk.nussknacker.engine.process.registrar

import com.typesafe.scalalogging.LazyLogging
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders
import org.apache.flink.runtime.state.StateBackend
import org.apache.flink.streaming.api.datastream.{DataStream, SingleOutputStreamOperator}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.util.{FlinkUserCodeClassLoaders, OutputTag}
import pl.touk.nussknacker.engine.api.StreamMetaData
import pl.touk.nussknacker.engine.deployment.DeploymentData
import pl.touk.nussknacker.engine.process.compiler.FlinkProcessCompilerData
Expand All @@ -24,6 +25,8 @@ trait StreamExecutionEnvPreparer {
def postRegistration(env: StreamExecutionEnvironment, compiledProcessWithDeps: FlinkProcessCompilerData, deploymentData: DeploymentData): Unit

def flinkClassLoaderSimulation: ClassLoader

def sideOutputGetter[T](singleOutputStreamOperator: SingleOutputStreamOperator[_], outputTag: OutputTag[T]): DataStream[T]
}

class DefaultStreamExecutionEnvPreparer(checkpointConfig: Option[CheckpointConfig],
Expand Down Expand Up @@ -71,8 +74,21 @@ class DefaultStreamExecutionEnvPreparer(checkpointConfig: Option[CheckpointConfi
}

override def flinkClassLoaderSimulation: ClassLoader = {
FlinkUserCodeClassLoaders.childFirst(Array.empty,
wrapInLambda(() => FlinkUserCodeClassLoaders.childFirst(Array.empty,
Thread.currentThread().getContextClassLoader, Array.empty, (t: Throwable) => throw t, true
)
))
}

override def sideOutputGetter[T](singleOutputStreamOperator: SingleOutputStreamOperator[_], outputTag: OutputTag[T]): DataStream[T] = {
wrapInLambda(() => singleOutputStreamOperator.getSideOutput(outputTag))
}

/*
* This is a bit hacky way to make compatibility overrides easier.
* We wrap incompatible API invocation with a lambda (i.e. new class in bytecode) to defer initialization,
* and make DefaultStreamExecutionEnvPreparer usable with older Flink versions.
* Otherwise, during class initialization, ClassNotFound/MethodNotFound exception are thrown
* */
private def wrapInLambda[T](obj: ()=>T): T = obj()
}

Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ trait FlinkStubbedRunner {
val streamGraph = env.getStreamGraph
streamGraph.setJobName(process.id)

val jobGraph = streamGraph.getJobGraph(null)
val jobGraph = streamGraph.getJobGraph()
jobGraph.setClasspaths(modelData.modelClassLoader.urls.asJava)
jobGraph.setSavepointRestoreSettings(savepointRestoreSettings)

Expand Down
2 changes: 1 addition & 1 deletion engine/flink/management/src/it/resources/docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM apache/flink:1.15.2-scala_${scala.major.version}-java11
FROM flink:1.16.0-scala_${scala.major.version}-java11

COPY entrypointWithIP.sh /
COPY conf.yml /
Expand Down

0 comments on commit 7de5cae

Please sign in to comment.