Skip to content

Commit

Permalink
Merge pull request #35 from TouK/1.15.0-release
Browse files Browse the repository at this point in the history
1.15.0 release
Updated CsvSource to extend StandardFlinkSource
  • Loading branch information
ForrestFairy committed May 23, 2024
2 parents df00b35 + 3fecad0 commit 5bda11e
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package pl.touk.nussknacker.sample.csv

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.functions.source.SourceFunction
import pl.touk.nussknacker.engine.flink.api.process.BasicFlinkSource
import org.apache.flink.streaming.api.datastream.DataStreamSource
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import pl.touk.nussknacker.engine.flink.api.process.{ExplicitTypeInformationSource, FlinkCustomNodeContext, StandardFlinkSource, StandardFlinkSourceFunctionUtils}
import pl.touk.nussknacker.engine.flink.api.timestampwatermark.{StandardTimestampWatermarkHandler, TimestampWatermarkHandler}

import java.io.File
Expand All @@ -12,17 +13,18 @@ import java.time.Duration
class CsvSource[T: TypeInformation](file: File,
separator: Char,
createRecord: Array[String] => T,
extractTimestamp: SerializableTimestampAssigner[T]) extends BasicFlinkSource[T] {

override def flinkSourceFunction: SourceFunction[T] = {
new CsvSourceFunction[T](file, separator, createRecord)
}
extractTimestamp: SerializableTimestampAssigner[T])
extends StandardFlinkSource[T]
with ExplicitTypeInformationSource[T] {

override def typeInformation: TypeInformation[T] = {
implicitly[TypeInformation[T]]
}

override def timestampAssigner: Option[TimestampWatermarkHandler[T]] = {
Some(StandardTimestampWatermarkHandler.boundedOutOfOrderness(extractTimestamp, maxOutOfOrderness = Duration.ofMinutes(10)))
Some(StandardTimestampWatermarkHandler.boundedOutOfOrderness(Some(extractTimestamp), maxOutOfOrderness = Duration.ofMinutes(10), None))
}

override protected def sourceStream(env: StreamExecutionEnvironment, flinkNodeContext: FlinkCustomNodeContext): DataStreamSource[T] =
StandardFlinkSourceFunctionUtils.createSourceStream(env, new CsvSourceFunction[T](file, separator, createRecord), typeInformation)
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class SampleComponentProviderTest extends Matchers with ValidatedValuesDetailedM
.streaming("test scenario")
.source("custom-source-node-name", TestScenarioRunner.testDataSource)
.enricher("component-provider-service-node-name", "output", "randomString", "length" -> "#input")
.processorEnd("end", TestScenarioRunner.testResultService, "value" -> "#output")
.emptySink("end", TestScenarioRunner.testResultSink, "value" -> "#output")

val runner = TestScenarioRunner
.flinkBased(config, flinkMiniCluster)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package pl.touk.nussknacker.sample.csv
import org.junit.jupiter.api.Test
import org.scalatest.Inside.inside
import org.scalatest.matchers.should.Matchers
import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.CustomNodeError
import pl.touk.nussknacker.engine.api.component.ComponentDefinition
import pl.touk.nussknacker.engine.api.context.ProcessCompilationError.CustomNodeError
import pl.touk.nussknacker.engine.api.parameter.ParameterName
import pl.touk.nussknacker.engine.build.ScenarioBuilder
import pl.touk.nussknacker.engine.flink.util.test.FlinkTestScenarioRunner._
Expand Down Expand Up @@ -33,7 +33,7 @@ class CallDetailRecordSourceTest extends Matchers with ValidatedValuesDetailedMe
val scenario = ScenarioBuilder
.streaming("test scenario")
.source("cdr source", "cdr", "fileName" -> s"'${cdrsFile.getFileName.toString}'")
.processorEnd("end", TestScenarioRunner.testResultService, "value" -> "#input")
.emptySink("end", TestScenarioRunner.testResultSink, "value" -> "#input")
val runner = TestScenarioRunner
.flinkBased(config, flinkMiniCluster)
.withExtraComponents(ComponentDefinition("cdr", CallDetailRecordSourceFactory.prepare(cdrsFile.getParent.toString, ';')) :: Nil)
Expand All @@ -59,7 +59,7 @@ class CallDetailRecordSourceTest extends Matchers with ValidatedValuesDetailedMe
val scenario = ScenarioBuilder
.streaming("test scenario")
.source("cdr source", "cdr", "fileName" -> s"'unexisting.csv'")
.processorEnd("end", TestScenarioRunner.testResultService, "value" -> "#input")
.emptySink("end", TestScenarioRunner.testResultSink, "value" -> "#input")
val runner = TestScenarioRunner
.flinkBased(config, flinkMiniCluster)
.withExtraComponents(ComponentDefinition("cdr", CallDetailRecordSourceFactory.prepare("/tmp", ';')) :: Nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class CsvSourceTest extends Matchers with ValidatedValuesDetailedMessage {
val scenario = ScenarioBuilder
.streaming("test scenario")
.source("source", "csvSource", "fileName" -> s"'${csvFile.getFileName.toString}'", "definition" -> "{{'name', 'String'}, {'phoneNumber', 'Long'}}")
.processorEnd("end", TestScenarioRunner.testResultService, "value" -> "#input")
.emptySink("end", TestScenarioRunner.testResultSink, "value" -> "#input")
val runner = TestScenarioRunner
.flinkBased(config, flinkMiniCluster)
.withExtraComponents(ComponentDefinition("csvSource", new GenericCsvSourceFactory(csvFile.getParent.toString, ';')) :: Nil)
Expand Down Expand Up @@ -81,7 +81,7 @@ class CsvSourceTest extends Matchers with ValidatedValuesDetailedMessage {
val scenario = ScenarioBuilder
.streaming("test scenario")
.source("source", "csvSource", params: _*)
.processorEnd("end", TestScenarioRunner.testResultService, "value" -> "#input")
.emptySink("end", TestScenarioRunner.testResultSink, "value" -> "#input")
val runner = TestScenarioRunner
.flinkBased(config, flinkMiniCluster)
.withExtraComponents(ComponentDefinition("csvSource", new GenericCsvSourceFactory("/tmp", ';')) :: Nil)
Expand Down
2 changes: 1 addition & 1 deletion nussknacker.version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.14.0
1.15.0

0 comments on commit 5bda11e

Please sign in to comment.