Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink 1.16 #3683

Merged
merged 7 commits into from
Nov 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 3 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,6 @@ lazy val commonSettings =
"com.typesafe.akka" %% "akka-stream" % akkaV,
"com.typesafe.akka" %% "akka-testkit" % akkaV,

// akka-actor depends on old, 0.8 version
"org.scala-lang.modules" %% "scala-java8-compat" % scalaCompatV,

//security features
Expand All @@ -253,7 +252,7 @@ lazy val commonSettings =
)
)

val flinkV = "1.15.2"
val flinkV = "1.16.0"
val avroV = "1.11.0"
//we should use max(version used by confluent, version acceptable by flink), https://docs.confluent.io/platform/current/installation/versions-interoperability.html - confluent version reference
//TODO: upgrade to 3.x after flink upgrade: flink up to version 1.15 doesn't accept kafka 3.x because org.apache.kafka.common.Metric.value was renamed to metricValue - it should be changed in flink 1.16
Expand All @@ -274,7 +273,7 @@ val scalaParsersV = "1.0.4"
val everitSchemaV = "1.14.1"
val slf4jV = "1.7.30"
val scalaLoggingV = "3.9.2"
val scalaCompatV = "0.9.1"
val scalaCompatV = "1.0.2"
val ficusV = "1.4.7"
val configV = "1.4.1"
val commonsLangV = "3.3.2"
Expand Down Expand Up @@ -1516,7 +1515,7 @@ lazy val bom = (project in file("bom"))
lazy val modules = List[ProjectReference](
requestResponseRuntime, liteEngineRuntimeApp, flinkDeploymentManager, flinkPeriodicDeploymentManager, flinkDevModel, flinkDevModelJava, defaultModel,
openapiComponents, interpreter, benchmarks, kafkaUtils, kafkaComponentsUtils, kafkaTestUtils, componentsUtils, componentsTestkit, defaultHelpers, commonUtils, utilsInternal, testUtils,
flinkExecutor, flinkSchemedKafkaComponentsUtils, flinkKafkaComponentsUtils, flinkComponentsUtils, flinkTests, flinkTestUtils, flinkComponentsApi, flinkExtensionsApi,
flinkExecutor, flinkSchemedKafkaComponentsUtils, flinkKafkaComponentsUtils, flinkComponentsUtils, flinkTests, flinkTestUtils, flinkComponentsApi, flinkExtensionsApi, flinkScalaUtils,
requestResponseComponentsUtils, requestResponseComponentsApi, componentsApi, extensionsApi, security, processReports, httpUtils,
restmodel, listenerApi, deploymentManagerApi, designer, sqlComponents, schemedKafkaComponentsUtils, flinkBaseComponents, flinkKafkaComponents,
liteComponentsApi, liteEngineKafkaComponentsApi, liteEngineRuntime, liteBaseComponents, liteKafkaComponents, liteKafkaComponentsTests, liteEngineKafkaRuntime, liteEngineKafkaIntegrationTest, liteEmbeddedDeploymentManager, liteK8sDeploymentManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,15 @@ class FlinkProcessRegistrar(compileProcess: (CanonicalProcess, ProcessVersion, D
val typeInformationForTi = InterpretationResultTypeInformation.create(typeInformationDetection, part.contextBefore)
val typeInformationForVC = typeInformationDetection.forContext(part.contextBefore)

registerSubsequentPart(start.getSideOutput(new OutputTag[InterpretationResult](part.id, typeInformationForTi))
registerSubsequentPart(sideOutput(start, new OutputTag[InterpretationResult](part.id, typeInformationForTi))
.map((value: InterpretationResult) => value.finalContext, typeInformationForVC), part)
}.foldLeft(Map[BranchEndDefinition, BranchEndData]()) {
_ ++ _
}
val branchForEnds = part.ends.collect {
case TypedEnd(be: BranchEnd, validationContext) =>
val ti = InterpretationResultTypeInformation.create(typeInformationDetection, validationContext)
be.definition -> BranchEndData(validationContext, start.getSideOutput(new OutputTag[InterpretationResult](be.nodeId, ti)))
be.definition -> BranchEndData(validationContext, sideOutput(start, new OutputTag[InterpretationResult](be.nodeId, ti)))
}.toMap
branchesForParts ++ branchForEnds
}
Expand All @@ -193,8 +193,8 @@ class FlinkProcessRegistrar(compileProcess: (CanonicalProcess, ProcessVersion, D
val typeInformationForIR = InterpretationResultTypeInformation.create(typeInformationDetection, contextBefore)
val typeInformationForCtx = typeInformationDetection.forContext(contextBefore)
// TODO: for sinks there are no further nodes to interpret but the function is registered to invoke listeners (e.g. to measure end metrics).
val afterInterpretation = registerInterpretationPart(start, part, SinkInterpretationName)
.getSideOutput(new OutputTag[InterpretationResult](FlinkProcessRegistrar.EndId, typeInformationForIR))
val afterInterpretation = sideOutput(registerInterpretationPart(start, part, SinkInterpretationName),
new OutputTag[InterpretationResult](FlinkProcessRegistrar.EndId, typeInformationForIR))
.map((value: InterpretationResult) => value.finalContext, typeInformationForCtx)
val customNodeContext = nodeContext(nodeComponentInfoFrom(part), Left(contextBefore))
val withValuePrepared = sink.prepareValue(afterInterpretation, customNodeContext)
Expand Down Expand Up @@ -270,6 +270,9 @@ class FlinkProcessRegistrar(compileProcess: (CanonicalProcess, ProcessVersion, D

}

private def sideOutput[T](stream: SingleOutputStreamOperator[_], tag: OutputTag[T])
= streamExecutionEnvPreparer.sideOutputGetter(stream, tag)

private def nodeComponentInfoFrom(processPart: ProcessPart): NodeComponentInfo = {
fromNodeData(processPart.node.data)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package pl.touk.nussknacker.engine.process.registrar

import com.typesafe.scalalogging.LazyLogging
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders
import org.apache.flink.runtime.state.StateBackend
import org.apache.flink.streaming.api.datastream.{DataStream, SingleOutputStreamOperator}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.util.{FlinkUserCodeClassLoaders, OutputTag}
import pl.touk.nussknacker.engine.api.StreamMetaData
import pl.touk.nussknacker.engine.deployment.DeploymentData
import pl.touk.nussknacker.engine.process.compiler.FlinkProcessCompilerData
Expand All @@ -24,6 +25,8 @@ trait StreamExecutionEnvPreparer {
def postRegistration(env: StreamExecutionEnvironment, compiledProcessWithDeps: FlinkProcessCompilerData, deploymentData: DeploymentData): Unit

def flinkClassLoaderSimulation: ClassLoader

def sideOutputGetter[T](singleOutputStreamOperator: SingleOutputStreamOperator[_], outputTag: OutputTag[T]): DataStream[T]
}

class DefaultStreamExecutionEnvPreparer(checkpointConfig: Option[CheckpointConfig],
Expand Down Expand Up @@ -71,8 +74,21 @@ class DefaultStreamExecutionEnvPreparer(checkpointConfig: Option[CheckpointConfi
}

override def flinkClassLoaderSimulation: ClassLoader = {
FlinkUserCodeClassLoaders.childFirst(Array.empty,
wrapInLambda(() => FlinkUserCodeClassLoaders.childFirst(Array.empty,
Thread.currentThread().getContextClassLoader, Array.empty, (t: Throwable) => throw t, true
)
))
}

override def sideOutputGetter[T](singleOutputStreamOperator: SingleOutputStreamOperator[_], outputTag: OutputTag[T]): DataStream[T] = {
wrapInLambda(() => singleOutputStreamOperator.getSideOutput(outputTag))
}

/*
* This is a bit hacky way to make compatibility overrides easier.
* We wrap incompatible API invocation with a lambda (i.e. new class in bytecode) to defer initialization,
* and make DefaultStreamExecutionEnvPreparer usable with older Flink versions.
* Otherwise, during class initialization, ClassNotFound/MethodNotFound exception are thrown
* */
private def wrapInLambda[T](obj: ()=>T): T = obj()
mproch marked this conversation as resolved.
Show resolved Hide resolved
}

Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ trait FlinkStubbedRunner {
val streamGraph = env.getStreamGraph
streamGraph.setJobName(process.id)

val jobGraph = streamGraph.getJobGraph(null)
val jobGraph = streamGraph.getJobGraph()
jobGraph.setClasspaths(modelData.modelClassLoader.urls.asJava)
jobGraph.setSavepointRestoreSettings(savepointRestoreSettings)

Expand Down
2 changes: 1 addition & 1 deletion engine/flink/management/src/it/resources/docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM apache/flink:1.15.2-scala_${scala.major.version}-java11
FROM flink:1.16.0-scala_${scala.major.version}-java11

COPY entrypointWithIP.sh /
COPY conf.yml /
Expand Down