Skip to content

Commit

Permalink
modules rename 'avro' => 'schemedkafka'
Browse files Browse the repository at this point in the history
  • Loading branch information
gadomsky committed Aug 8, 2022
1 parent f0d3f52 commit f6a9ed8
Show file tree
Hide file tree
Showing 180 changed files with 661 additions and 669 deletions.
Expand Up @@ -7,10 +7,10 @@ import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.formats.avro.typeutils.{LogicalTypesGenericRecordAvroTypeInfo, LogicalTypesGenericRecordWithSchemaIdAvroTypeInfo}
import org.openjdk.jmh.annotations._
import pl.touk.nussknacker.engine.avro.kryo.AvroSerializersRegistrar
import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.ConfluentUtils
import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.client.{CachedConfluentSchemaRegistryClientFactory, MockSchemaRegistryClient}
import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.kryo.SchemaIdBasedAvroGenericRecordSerializer
import pl.touk.nussknacker.engine.schemedkafka.kryo.AvroSerializersRegistrar
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.ConfluentUtils
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.{CachedConfluentSchemaRegistryClientFactory, MockSchemaRegistryClient}
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.kryo.SchemaIdBasedAvroGenericRecordSerializer
import pl.touk.nussknacker.engine.benchmarks.serialization.SerializationBenchmarkSetup
import pl.touk.nussknacker.engine.kafka.{KafkaConfig, SchemaRegistryClientKafkaConfig}

Expand Down
Expand Up @@ -2,8 +2,8 @@ package pl.touk.nussknacker.engine.benchmarks.serialization.avro

import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
import pl.touk.nussknacker.engine.avro.{AvroUtils, LogicalTypesGenericRecordBuilder}
import pl.touk.nussknacker.engine.avro.schemaregistry.GenericRecordWithSchemaId
import pl.touk.nussknacker.engine.schemedkafka.{AvroUtils, LogicalTypesGenericRecordBuilder}
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.GenericRecordWithSchemaId

object AvroSamples {

Expand Down
28 changes: 14 additions & 14 deletions build.sbt
Expand Up @@ -606,7 +606,7 @@ lazy val flinkDevModel = (project in flink("management/dev-model")).
)
}
).
dependsOn(flinkAvroComponentsUtils,
dependsOn(flinkSchemedKafkaComponentsUtils,
flinkComponentsUtils % Provided,
componentsUtils,
//TODO: NodeAdditionalInfoProvider & ComponentExtractor should probably be moved to API?
Expand Down Expand Up @@ -704,7 +704,7 @@ lazy val benchmarks = (project in file("benchmarks")).
Jmh / classDirectory := (Test / classDirectory).value,
Jmh / dependencyClasspath := (Test / dependencyClasspath).value,
Jmh / generateJmhSourcesAndResources := (Jmh / generateJmhSourcesAndResources).dependsOn(Test / compile).value,
).dependsOn(interpreter, flinkAvroComponentsUtils, flinkExecutor, flinkBaseComponents, testUtils % "test")
).dependsOn(interpreter, flinkSchemedKafkaComponentsUtils, flinkExecutor, flinkBaseComponents, testUtils % "test")


lazy val kafkaUtils = (project in utils("kafka-utils")).
Expand Down Expand Up @@ -737,10 +737,10 @@ lazy val kafkaComponentsUtils = (project in utils("kafka-components-utils")).
}
).dependsOn(kafkaUtils, componentsUtils % Provided, componentsApi % Provided, testUtils % "it, test")

lazy val avroComponentsUtils = (project in utils("avro-components-utils")).
lazy val schemedKafkaComponentsUtils = (project in utils("schemed-kafka-components-utils")).
settings(commonSettings).
settings(
name := "nussknacker-avro-components-utils",
name := "nussknacker-schemed-kafka-components-utils",
libraryDependencies ++= {
Seq(
"io.confluent" % "kafka-json-schema-provider" % confluentV excludeAll(
Expand All @@ -762,10 +762,10 @@ lazy val avroComponentsUtils = (project in utils("avro-components-utils")).
}
).dependsOn(componentsUtils % Provided, kafkaComponentsUtils, interpreter % "test", kafkaTestUtils % "test", jsonUtils)

lazy val flinkAvroComponentsUtils = (project in flink("avro-components-utils")).
lazy val flinkSchemedKafkaComponentsUtils = (project in flink("schemed-kafka-components-utils")).
settings(commonSettings).
settings(
name := "nussknacker-flink-avro-components-utils",
name := "nussknacker-flink-schemed-kafka-components-utils",
libraryDependencies ++= {
Seq(
"org.apache.flink" %% "flink-streaming-scala" % flinkV % "provided",
Expand All @@ -775,7 +775,7 @@ lazy val flinkAvroComponentsUtils = (project in flink("avro-components-utils")).
)
}
)
.dependsOn(avroComponentsUtils, flinkKafkaComponentsUtils, flinkExtensionsApi % Provided, flinkComponentsUtils % Provided, componentsUtils % Provided,
.dependsOn(schemedKafkaComponentsUtils, flinkKafkaComponentsUtils, flinkExtensionsApi % Provided, flinkComponentsUtils % Provided, componentsUtils % Provided,
kafkaTestUtils % "test", flinkTestUtils % "test", flinkExecutor % "test")

lazy val flinkKafkaComponentsUtils = (project in flink("kafka-components-utils")).
Expand Down Expand Up @@ -845,7 +845,7 @@ lazy val liteComponentsTestkit = (project in utils("lite-components-testkit")).
settings(commonSettings).
settings(
name := "nussknacker-lite-components-testkit",
).dependsOn(componentsTestkit, requestResponseRuntime, liteEngineRuntime, avroComponentsUtils)
).dependsOn(componentsTestkit, requestResponseRuntime, liteEngineRuntime, schemedKafkaComponentsUtils)


lazy val commonUtils = (project in utils("utils")).
Expand Down Expand Up @@ -982,7 +982,7 @@ lazy val liteKafkaComponents = (project in lite("components/kafka")).
liteEngineKafkaComponentsApi % Provided,
liteComponentsApi % Provided,
componentsUtils % Provided,
avroComponentsUtils,
schemedKafkaComponentsUtils,
liteComponentsTestkit % Test
)

Expand Down Expand Up @@ -1027,7 +1027,7 @@ lazy val liteEngineKafkaIntegrationTest: Project = (project in lite("kafka/integ
"com.dimafeng" %% "testcontainers-scala-kafka" % testcontainersScalaV % "it",
"com.softwaremill.sttp.client" %% "async-http-client-backend-future" % sttpV % "it"
)
).dependsOn(interpreter % "it", avroComponentsUtils % "it", testUtils % "it", kafkaTestUtils % "it", httpUtils % "it")
).dependsOn(interpreter % "it", schemedKafkaComponentsUtils % "it", testUtils % "it", kafkaTestUtils % "it", httpUtils % "it")

lazy val liteEngineKafkaComponentsApi = (project in lite("kafka/components-api")).
settings(commonSettings).
Expand Down Expand Up @@ -1341,7 +1341,7 @@ lazy val flinkKafkaComponents = (project in flink("components/kafka")).
settings(publishAssemblySettings: _*).
settings(
name := "nussknacker-flink-kafka-components",
).dependsOn(flinkComponentsApi % Provided, flinkKafkaComponentsUtils, flinkAvroComponentsUtils, commonUtils % Provided, componentsUtils % Provided)
).dependsOn(flinkComponentsApi % Provided, flinkKafkaComponentsUtils, flinkSchemedKafkaComponentsUtils, commonUtils % Provided, componentsUtils % Provided)

lazy val copyUiDist = taskKey[Unit]("copy ui")
lazy val copyUiSubmodulesDist = taskKey[Unit]("copy ui submodules")
Expand Down Expand Up @@ -1466,7 +1466,7 @@ lazy val ui = (project in file("ui/server"))
liteEmbeddedDeploymentManager % "provided",
liteK8sDeploymentManager % "provided",
kafkaUtils % "provided",
avroComponentsUtils % "provided",
schemedKafkaComponentsUtils % "provided",
requestResponseRuntime % "provided",
developmentTestsDeploymentManager % "provided",
)
Expand Down Expand Up @@ -1513,9 +1513,9 @@ lazy val bom = (project in file("bom"))
lazy val modules = List[ProjectReference](
requestResponseRuntime, requestResponseApp, flinkDeploymentManager, flinkPeriodicDeploymentManager, flinkDevModel, flinkDevModelJava, defaultModel,
openapiComponents, interpreter, benchmarks, kafkaUtils, kafkaComponentsUtils, kafkaTestUtils, componentsUtils, componentsTestkit, helpersUtils, commonUtils, utilsInternal, testUtils,
flinkExecutor, flinkAvroComponentsUtils, flinkKafkaComponentsUtils, flinkComponentsUtils, flinkTests, flinkTestUtils, flinkComponentsApi, flinkExtensionsApi,
flinkExecutor, flinkSchemedKafkaComponentsUtils, flinkKafkaComponentsUtils, flinkComponentsUtils, flinkTests, flinkTestUtils, flinkComponentsApi, flinkExtensionsApi,
requestResponseComponentsUtils, requestResponseComponentsApi, componentsApi, extensionsApi, security, processReports, httpUtils,
restmodel, listenerApi, deploymentManagerApi, ui, sqlComponents, avroComponentsUtils, flinkBaseComponents, flinkKafkaComponents,
restmodel, listenerApi, deploymentManagerApi, ui, sqlComponents, schemedKafkaComponentsUtils, flinkBaseComponents, flinkKafkaComponents,
liteComponentsApi, liteEngineKafkaComponentsApi, liteEngineRuntime, liteBaseComponents, liteKafkaComponents, liteEngineKafkaRuntime, liteEngineKafkaIntegrationTest, liteEmbeddedDeploymentManager, liteK8sDeploymentManager,
liteRequestResponseComponents, scenarioApi, commonApi, jsonUtils, liteComponentsTestkit, flinkComponentsTestkit
)
Expand Down
6 changes: 6 additions & 0 deletions docs/MigrationGuide.md
Expand Up @@ -48,6 +48,11 @@ To see the biggest differences please consult the [changelog](Changelog.md).
* Changes of `ValidationMode`, fields: `acceptUnfilledOptional` and `acceptRedundant` were removed
* [#3376](https://github.com/TouK/nussknacker/pull/3376) `FlinkKafkaSource.flinkSourceFunction`, `FlinkKafkaSource.createFlinkSource` and `DelayedFlinkKafkaConsumer.apply` takes additional argument, `FlinkCustomNodeContext` now
* [#3272](https://github.com/TouK/nussknacker/pull/3272) `KafkaZookeeperServer` renamed to `EmbeddedKafkaServer`, `zooKeeperServer` field changed type to `Option` and is hidden now.
* [#3365](https://github.com/TouK/nussknacker/pull/3365) Numerous renames:
* module `nussknacker-avro-components-utils` -> `nussknacker-schemed-kafka-components-utils`
* module `nussknacker-flink-avro-components-utils` -> `nussknacker-flink-schemed-kafka-components-utils`
* package `pl.touk.nussknacker.engine.avro` -> `pl.touk.nussknacker.engine.schemedkafka`
* object `KafkaAvroBaseComponentTransformer` -> `KafkaUniversalComponentTransformer`

### REST API changes

Expand All @@ -61,6 +66,7 @@ To see the biggest differences please consult the [changelog](Changelog.md).

### Breaking changes
* [#3328](https://github.com/TouK/nussknacker/pull/3328) Due to addition of support for different schema type (AvroSchema and JsonSchema for now) serialization format of `NkSerializableParsedSchema` has changed. Flink state compatibility of scenarios which use Avro sources or sinks has been broken.
* [#3365](https://github.com/TouK/nussknacker/pull/3365) Due to renames (see section `Code API changes`) Flink state compatibility of scenarios which use Avro sources or sinks has been broken.

### Other changes

Expand Down

This file was deleted.

Expand Up @@ -5,11 +5,11 @@ import com.typesafe.config.ConfigValueFactory.fromAnyRef
import net.ceedubs.ficus.Ficus.{booleanValueReader, optionValueReader, toFicusConfig}
import pl.touk.nussknacker.engine.api.component.{ComponentDefinition, ComponentProvider, NussknackerVersion}
import pl.touk.nussknacker.engine.api.process.ProcessObjectDependencies
import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.ConfluentSchemaBasedSerdeProvider
import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.client.{CachedConfluentSchemaRegistryClientFactory, ConfluentSchemaRegistryClientFactory}
import pl.touk.nussknacker.engine.avro.sink.flink.{FlinkKafkaAvroSinkImplFactory, FlinkKafkaUniversalSinkImplFactory}
import pl.touk.nussknacker.engine.avro.sink.{KafkaAvroSinkFactory, KafkaAvroSinkFactoryWithEditor, UniversalKafkaSinkFactory}
import pl.touk.nussknacker.engine.avro.source.{KafkaAvroSourceFactory, UniversalKafkaSourceFactory}
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.ConfluentSchemaBasedSerdeProvider
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.{CachedConfluentSchemaRegistryClientFactory, ConfluentSchemaRegistryClientFactory}
import pl.touk.nussknacker.engine.schemedkafka.sink.flink.{FlinkKafkaAvroSinkImplFactory, FlinkKafkaUniversalSinkImplFactory}
import pl.touk.nussknacker.engine.schemedkafka.sink.{KafkaAvroSinkFactory, KafkaAvroSinkFactoryWithEditor, UniversalKafkaSinkFactory}
import pl.touk.nussknacker.engine.schemedkafka.source.{KafkaAvroSourceFactory, UniversalKafkaSourceFactory}
import pl.touk.nussknacker.engine.kafka.KafkaConfig
import pl.touk.nussknacker.engine.kafka.generic.sinks.GenericKafkaJsonSinkFactory
import pl.touk.nussknacker.engine.kafka.generic.sources.{GenericJsonSourceFactory, GenericTypedJsonSourceFactory}
Expand Down
Expand Up @@ -14,12 +14,12 @@ import pl.touk.nussknacker.engine.api._
import pl.touk.nussknacker.engine.api.component.{ComponentGroupName, ParameterConfig, SingleComponentConfig}
import pl.touk.nussknacker.engine.api.definition.{FixedExpressionValue, FixedValuesParameterEditor, MandatoryParameterValidator, StringParameterEditor}
import pl.touk.nussknacker.engine.api.process._
import pl.touk.nussknacker.engine.avro.schemaregistry.SchemaBasedSerdeProvider
import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.ConfluentSchemaBasedSerdeProvider
import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.client.{CachedConfluentSchemaRegistryClientFactory, ConfluentSchemaRegistryClientFactory, MockConfluentSchemaRegistryClientFactory, MockSchemaRegistryClient}
import pl.touk.nussknacker.engine.avro.sink.flink.{FlinkKafkaAvroSinkImplFactory, FlinkKafkaUniversalSinkImplFactory}
import pl.touk.nussknacker.engine.avro.sink.{KafkaAvroSinkFactoryWithEditor, UniversalKafkaSinkFactory}
import pl.touk.nussknacker.engine.avro.source.{KafkaAvroSourceFactory, UniversalKafkaSourceFactory}
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.SchemaBasedSerdeProvider
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.ConfluentSchemaBasedSerdeProvider
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.{CachedConfluentSchemaRegistryClientFactory, ConfluentSchemaRegistryClientFactory, MockConfluentSchemaRegistryClientFactory, MockSchemaRegistryClient}
import pl.touk.nussknacker.engine.schemedkafka.sink.flink.{FlinkKafkaAvroSinkImplFactory, FlinkKafkaUniversalSinkImplFactory}
import pl.touk.nussknacker.engine.schemedkafka.sink.{KafkaAvroSinkFactoryWithEditor, UniversalKafkaSinkFactory}
import pl.touk.nussknacker.engine.schemedkafka.source.{KafkaAvroSourceFactory, UniversalKafkaSourceFactory}
import pl.touk.nussknacker.engine.flink.util.sink.{EmptySink, SingleValueSinkFactory}
import pl.touk.nussknacker.engine.flink.util.source.{EspDeserializationSchema, ReturningClassInstanceSource, ReturningTestCaseClass}
import pl.touk.nussknacker.engine.kafka.{KafkaConfig, SchemaRegistryCacheConfig}
Expand Down
Expand Up @@ -35,8 +35,8 @@
import org.apache.flink.formats.avro.utils.DataOutputEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.touk.nussknacker.engine.avro.AvroUtils;
import pl.touk.nussknacker.engine.avro.schema.StringForcingDatumReaderProvider;
import pl.touk.nussknacker.engine.schemedkafka.AvroUtils;
import pl.touk.nussknacker.engine.schemedkafka.schema.StringForcingDatumReaderProvider;

import java.util.Optional;

Expand Down
Expand Up @@ -35,7 +35,7 @@
import org.apache.flink.util.InstantiationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.touk.nussknacker.engine.avro.AvroUtils;
import pl.touk.nussknacker.engine.schemedkafka.AvroUtils;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down
@@ -0,0 +1 @@
pl.touk.nussknacker.engine.schemedkafka.kryo.AvroSerializersRegistrar
@@ -1,4 +1,4 @@
package pl.touk.nussknacker.engine.avro.kryo
package pl.touk.nussknacker.engine.schemedkafka.kryo

import com.typesafe.config.Config
import com.typesafe.scalalogging.LazyLogging
Expand All @@ -7,9 +7,9 @@ import org.apache.avro.generic.GenericData
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.java.typeutils.AvroUtils
import pl.touk.nussknacker.engine.api.component.ComponentProviderConfig
import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.client.{CachedConfluentSchemaRegistryClientFactory, ConfluentSchemaRegistryClientFactory}
import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.kryo.SchemaIdBasedAvroGenericRecordSerializer
import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.serialization.GenericRecordSchemaIdSerializationSupport
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.{CachedConfluentSchemaRegistryClientFactory, ConfluentSchemaRegistryClientFactory}
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.kryo.SchemaIdBasedAvroGenericRecordSerializer
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.serialization.GenericRecordSchemaIdSerializationSupport
import pl.touk.nussknacker.engine.flink.api.serialization.SerializersRegistrar
import pl.touk.nussknacker.engine.kafka.KafkaConfig

Expand Down
@@ -1,12 +1,12 @@
package pl.touk.nussknacker.engine.avro.schemaregistry.confluent
package pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent

import com.typesafe.scalalogging.LazyLogging
import io.confluent.kafka.schemaregistry.avro.AvroSchema
import org.apache.avro.specific.SpecificRecordBase
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.formats.avro.typeutils.{LogicalTypesAvroTypeInfo, LogicalTypesGenericRecordAvroTypeInfo, LogicalTypesGenericRecordWithSchemaIdAvroTypeInfo}
import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.serialization.GenericRecordSchemaIdSerializationSupport
import pl.touk.nussknacker.engine.avro.{AvroUtils, RuntimeSchemaData}
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.serialization.GenericRecordSchemaIdSerializationSupport
import pl.touk.nussknacker.engine.schemedkafka.{AvroUtils, RuntimeSchemaData}
import pl.touk.nussknacker.engine.kafka.KafkaConfig

import scala.reflect.{ClassTag, classTag}
Expand Down
@@ -1,13 +1,13 @@
package pl.touk.nussknacker.engine.avro.schemaregistry.confluent.kryo
package pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.kryo

import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.io.{Input, Output}
import org.apache.avro.generic.GenericData
import org.apache.avro.io.{DecoderFactory, EncoderFactory}
import pl.touk.nussknacker.engine.avro.schema.DatumReaderWriterMixin
import pl.touk.nussknacker.engine.avro.schemaregistry.GenericRecordWithSchemaId
import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.ConfluentUtils
import pl.touk.nussknacker.engine.avro.schemaregistry.confluent.client.ConfluentSchemaRegistryClientFactory
import pl.touk.nussknacker.engine.schemedkafka.schema.DatumReaderWriterMixin
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.GenericRecordWithSchemaId
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.ConfluentUtils
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.ConfluentSchemaRegistryClientFactory
import pl.touk.nussknacker.engine.flink.api.serialization.SerializerWithSpecifiedClass
import pl.touk.nussknacker.engine.kafka.{KafkaConfig, SchemaRegistryClientKafkaConfig}

Expand Down
@@ -1,13 +1,13 @@
package pl.touk.nussknacker.engine.avro.sink.flink
package pl.touk.nussknacker.engine.schemedkafka.sink.flink

import io.confluent.kafka.schemaregistry.ParsedSchema
import io.confluent.kafka.schemaregistry.avro.AvroSchema
import org.apache.flink.formats.avro.typeutils.NkSerializableParsedSchema
import pl.touk.nussknacker.engine.api.LazyParameter
import pl.touk.nussknacker.engine.api.process.Sink
import pl.touk.nussknacker.engine.avro.RuntimeSchemaData
import pl.touk.nussknacker.engine.avro.encode.ValidationMode
import pl.touk.nussknacker.engine.avro.sink.KafkaAvroSinkImplFactory
import pl.touk.nussknacker.engine.schemedkafka.RuntimeSchemaData
import pl.touk.nussknacker.engine.schemedkafka.encode.ValidationMode
import pl.touk.nussknacker.engine.schemedkafka.sink.KafkaAvroSinkImplFactory
import pl.touk.nussknacker.engine.kafka.serialization.KafkaSerializationSchema
import pl.touk.nussknacker.engine.kafka.{KafkaConfig, PreparedKafkaTopic}
import pl.touk.nussknacker.engine.util.KeyedValue
Expand Down

0 comments on commit f6a9ed8

Please sign in to comment.