diff --git a/common/build.gradle b/common/build.gradle index 463987ee..82df3944 100644 --- a/common/build.gradle +++ b/common/build.gradle @@ -25,8 +25,8 @@ artifacts { } java { - sourceCompatibility = JavaVersion.VERSION_11 - targetCompatibility = JavaVersion.VERSION_11 + sourceCompatibility = JavaVersion.VERSION_17 + targetCompatibility = JavaVersion.VERSION_17 } repositories { @@ -39,15 +39,15 @@ dependencies { implementation 'org.slf4j:slf4j-simple:2.0.7' implementation('org.apache.kafka:kafka-clients') { version { - strictly '3.7.0' + strictly '3.8.0' } } - implementation 'io.confluent:kafka-streams-avro-serde:7.5.1' + implementation 'io.confluent:kafka-streams-avro-serde:7.7.0' testImplementation 'junit:junit:4.13.2' testImplementation 'org.junit.jupiter:junit-jupiter-api:5.9.2' testImplementation 'org.hamcrest:hamcrest:2.2' - testImplementation 'org.testcontainers:testcontainers:1.19.3' - testImplementation 'org.testcontainers:kafka:1.19.3' + testImplementation 'org.testcontainers:testcontainers:1.20.1' + testImplementation 'org.testcontainers:kafka:1.20.1' testImplementation 'commons-codec:commons-codec:1.17.0' testImplementation 'org.apache.flink:flink-sql-connector-kafka:3.2.0-1.19' testImplementation 'org.apache.flink:flink-connector-base:1.19.1' diff --git a/confluent-parallel-consumer-application/kafka/build.gradle b/confluent-parallel-consumer-application/kafka/build.gradle index 2741d61c..38e60c89 100644 --- a/confluent-parallel-consumer-application/kafka/build.gradle +++ b/confluent-parallel-consumer-application/kafka/build.gradle @@ -31,7 +31,7 @@ repositories { dependencies { implementation project(':common') implementation "org.slf4j:slf4j-simple:2.0.7" - implementation "io.confluent.parallelconsumer:parallel-consumer-core:0.5.2.4" + implementation "io.confluent.parallelconsumer:parallel-consumer-core:0.5.3.2" implementation "org.apache.commons:commons-lang3:3.12.0" implementation "me.tongfei:progressbar:0.9.3" implementation 'org.awaitility:awaitility:4.2.0' @@ -41,7 +41,7 @@ dependencies { testImplementation 'org.junit.jupiter:junit-jupiter-api:5.9.2' testImplementation 'org.hamcrest:hamcrest:2.2' testImplementation 'org.awaitility:awaitility:4.2.0' - testImplementation "io.confluent.parallelconsumer:parallel-consumer-core:0.5.2.4:tests" // for LongPollingMockConsumer + testImplementation "io.confluent.parallelconsumer:parallel-consumer-core:0.5.3.2:tests" // for LongPollingMockConsumer testRuntimeOnly 'org.junit.platform:junit-platform-launcher' testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.9.2' } diff --git a/data-contracts/.gitignore b/data-contracts/.gitignore new file mode 100644 index 00000000..4feefcdd --- /dev/null +++ b/data-contracts/.gitignore @@ -0,0 +1,23 @@ +### Gradle template +.gradle +**/build/ +!producer-app-schema-v1/src/**/build/ + +# Ignore Gradle GUI config +gradle-app.setting + +# Avoid ignoring Gradle wrapper jar file (.jar files are usually ignored) +!gradle-wrapper.jar + +# Avoid ignore Gradle wrappper properties +!gradle-wrapper.properties + +# Cache of project +.gradletasknamecache + +# Eclipse Gradle plugin generated files +# Eclipse Core +.project +# JDT-specific (Eclipse Java Development Tools) +.classpath + diff --git a/data-contracts/README.md b/data-contracts/README.md new file mode 100644 index 00000000..00bf0f64 --- /dev/null +++ b/data-contracts/README.md @@ -0,0 +1,376 @@ +# Managing Data Contracts in Confluent Cloud + +Data contracts consist not only of the schemas to define the structure of events, but also rulesets allowing for more fine-grained validations, +controls, and discovery. In this tutorial, we'll evolve a schemas and add data quality and migration rules. + + + +In the workflow above, we see these tools in action: +* The [Confluent Terraform Provider](https://registry.terraform.io/providers/confluentinc/confluent/latest/docs) is used to define Confluent Cloud assets (Kafka cluster(s), Data Governance, Kafka Topics, and Schema Configurations). +* Using the newly created Schema Registry, data engineers and architects define the schema of the events that comprise the organization's canonical data model - i.e. entities, events, and commands that are shared across applications. - along with other parts of the data contract. This includes data quality rules, metadata, and migration rules. A gradle plugin is utilized to register the schemas and related elements of the data contract with the Schema Registry. +* Applications which producer and/or consume these event types can download the schemas from the Schema Registry. In our example, this is a JVM application built using Gradle. A gradle plugin is used to download the schemas, after which another gradle plugin is used to generate Java classes from those schemas - thus providing the application with compile-time type safety. + +We will cover these steps in detail. + +## Running the Example + +In this tutorial we'll create Confluent Cloud infrastructure - including a Kafka cluster and Schema Registry. Then we'll create +a Kafka topic named `membership-avro` to store `Membership` events. The Apache Avro schema is maintained and managed in this repo +along with metadata and migration rules. + +We will evolve the `membership` schema, refactoring the events to encapsulate the date-related fields of version 1 into its +own `record` type in version 2. Typically this would be a breaking change. However, [data migration rules](https://docs.confluent.io/cloud/current/sr/fundamentals/data-contracts.html#migration-rules) in the schema registry +allow us to perform this schema change without breaking producers or consumers. + +At the time this is written, this data contract functionality is available to Java, GO, and .NET Confluent client implementations. We'll update this example as other clients evolve. + +### Prerequisites + +Clone the `confluentinc/tutorials` GitHub repository (if you haven't already) and navigate to the `tutorials` directory: + +```shell +git clone git@github.com:confluentinc/tutorials.git +cd tutorials +``` + +Here are the tools needed to run this tutorial: +* [Confluent Cloud](http://confluent.cloud) +* [Confluent CLI](https://docs.confluent.io/confluent-cli/current/install.html) +* [Terraform](https://developer.hashicorp.com/terraform/install?product_intent=terraform) +* [jq](https://jqlang.github.io/jq/) +* JDK 17 +* IDE of choice + +> [!Note] When installing and configuring the Confluent CLI, include the Confluent Cloud credentials as environment variables for future use. For instance with bash or zsh, include these export statements: +> +> ```shell +> export CONFLUENT_CLOUD_API_KEY= +> export CONFLUENT_CLOUD_API_SECRET +> ``` +> + +### Executing Terraform + +To create Confluent Cloud assets, change to the `cc-terraform` subdirectory. We'll step through the commands and what they do. + +#### Create Confluent Cloud Assets + +Terraform can use the value of any environment variable whose name begins with `TF_VAR_` as the value of a terraform variable of the same name. For more on this functionality, see the [terraform documentation](https://developer.hashicorp.com/terraform/cli/config/environment-variables#tf_var_name). + +Our example requires we set the value the `org_id` variable from Confluent Cloud. This denotes which organization will house the Confluent Cloud assets we are creating. So let's export the Confluent Cloud organization ID to a terraform environment variable. + +This command may open a browser window asking you to authenticate to Confluent Cloud. Once that's complete, the result of +`confluent organization list` is queried by `jq` to extract the `id` of the current organization to which you are authenticated: + +```shell +export TF_VAR_org_id=$(confluent organization list -o json | jq -c -r '.[] | select(.is_current)' | jq '.id') +``` + +Now, we are ready to initialize the terraform environment, create a `plan` and `apply` said plan to create CC assets: + +```shell +terraform init +terraform plan -out "tfplan" +terraform apply "tfplan" +``` + +Let's have a look at what we've created. We now have a Confluent Cloud environment, which includes a Kafka cluster and Data Governance platform with Confluent Schema Registry. Within the Kafka cluster, we create a Kafka topic named `membership-avro`. + +```terraform +resource "confluent_kafka_topic" "membership_avro" { + topic_name = "membership-avro" + kafka_cluster { + id = confluent_kafka_cluster.kafka_cluster.id + } + rest_endpoint = confluent_kafka_cluster.kafka_cluster.rest_endpoint +... + partitions_count = 10 +... +} +``` + +Then we configure the subject containing the value of events for this topic using `confluent_subject_config`. This tells the Schema Registry that `membership-avro-value` schemas will be `BACKWARD` compatible and defines an attribute called `major_version` to be used as the `compatibilityGroup`. + +```terraform +resource "confluent_subject_config" "membership_value_avro" { + subject_name = "${confluent_kafka_topic.membership_avro.topic_name}-value" + + schema_registry_cluster { + id = data.confluent_schema_registry_cluster.advanced.id + } + rest_endpoint = data.confluent_schema_registry_cluster.advanced.rest_endpoint +... + compatibility_level = "BACKWARD" + compatibility_group = "major_version" +... +} +``` + +For more on these data contract configuration enhancements, refer to the CC docs on [data contracts](https://docs.confluent.io/cloud/current/sr/fundamentals/data-contracts.html#configuration-enhancements). + + +#### Prepare Client Properties + +This demo has tailored the output of `terraform apply` to return the properties needed to connect to Confluent Cloud. The command below will +reformat the names of those properties into the names used in Kafka Client configurations, then export those outputs to a properties file +in our project: + +```shell +terraform output -json \ + | jq -r 'to_entries | map( {key: .key|tostring|split("_")|join("."), value: .value} ) | map("\(.key)=\(.value.value)") | .[]' \ + | while read -r line ; do echo "$line"; done > ../shared/src/main/resources/confluent.properties +``` + +All Kafka Client code in this project loads connection properties form `shared/src/main/resources/confluent.properties`. For an example of this +properties file, see [confluent.properties.orig](shared/src/main/resources/confluent.properties.orig). + +> [!NOTE] +> The file-based approach we're using here is NOT recommended for a production-quality application. Perhaps a secrets manager implementation would be better suited - which the major cloud providers all offer, or perhaps a tool like Hashicorp Vault. Such a tool would also have client libraries in a Maven repository for the JVM applications to access the secrets. + +### Producing and Consuming Events + +We'll create producer and consumer classes to configure and provide Kafka clients for our application(s). In the `shared` module there are implementations to encapsulate this behavior: + +```kotlin +abstract class BaseProducer(propOverrides: Map = mapOf()) { + ... + val kafkaProducer: KafkaProducer = KafkaProducer(getProducerProperties(propOverrides)) + + open fun send(topicName: String, key: Key, value: Value): Future? { + ... + } +} + +abstract class BaseConsumer(propOverrides: Map = mapOf()) { + ... + val kafkaConsumer: KafkaConsumer = KafkaConsumer(getConsumerProperties(propOverrides)) + + fun start(topics: List) { + kafkaConsumer.subscribe(topics) + while (true) { + val records = kafkaConsumer.poll(Duration.ofSeconds(5)) + for (record in records) { + logger.trace("Record from ${record.topic()}, ${record.partition()}, ${record.offset()}") + consumeRecord(record.key(), record.value()) + } + } + } + + abstract fun consumeRecord(key: Key, value: Value) +} +``` + +Build the `shared` module from the root of the `tutorials` repo: +```shell +./gradlew :data-contracts:shared:build +``` + +As the schemas evolve, we'll create implementations of these base classes to produce and consume events with specific versions of the schemas. + +### Schema Evolution + +Schemas are CODE! As such, they will evolve to meet new business requirements. In the upcoming sections, we'll create JVM applications - written in Kotlin and built with Gradle - using different versions of the schema. We will utilize the Data Governance migration rules, allowing us to make what would ordinarily be "breaking changes" to the schema version but with the caveat of a mapping between the versions based on Kafka client configurations. + +#### Working with Version 1 + +The `membership` schema begins as a fairly "flat" data model, where the fields to denote start and end dates are at the top-level of the object: + +```avroschema +{ + "name": "Membership", + "namespace": "io.confluent.devrel", + "type": "record", + "fields": [ + {"name": "user_id", "type": "string"}, + {"name": "start_date", "type": {"type": "int", "logicalType": "date"}}, + {"name": "end_date", "type": {"type": "int", "logicalType": "date"}} + ] +} +``` + +Using the [schema-registry-plugin](https://github.com/ImFlog/schema-registry-plugin) for gradle, we can register this schema with the Confluent Schema Registry. + +```kotlin +schemaRegistry { + ... + register { + subject(inputSubject = "membership-avro-value", type = "AVRO", file = "$avroSchemaDir/membership_v1.avsc") + .setMetadata("$metadataDir/membership_major_version_1.json") + } +} +``` + +Remember the `compatibility_group` parameter from the subject configuration? This now comes into play as we set the metadata on the schema to denote this version as `major_version=1`: + +```json +{ + "properties": { + "major_version": 1 + } +} +``` + +Schema registration is completed via the `registerSchemas` task of the `schemas` module: + +```shell +./gradlew :data-contracts:schemas:registerSchemasTask +``` + +To use this schema in an application - `app-schema-v1` - we utilize the same gradle plugin's `downloadSchemasTask`. First we define the subjects we want to download. In this example, we copy the schema to directory `src/main/avro`: + +```kotlin +schemaRegistry { + download { + // download the membership avro schema, version 1 + subject("membership-avro-value", "${projectDir}/src/main/avro", 1) + } +} +``` + +With the schema(s) downloaded, we next want to generate Java code to provide our application an SDK with compile-time bindings to serialize and deserialize events to and from the `membership-avro` topic. Let's use the [gradle-avro-dependency-plugin](https://github.com/bakdata/gradle-avro-dependency-plugin) to generate code, using the `generateAvroJava` task. + +For simplicity, I have encapsulated `downloadSchemasTask` with the `generateAvroJava` task into a custom gradle task named `generateCode`, executed as follows: + +```shell +./gradlew :data-contracts:app-schema-v1:generateCode +``` + +Now implement classes to produce and consume events with this schema. First the producer class, extending `BaseProducer`: + +```kotlin +class MembershipProducer: BaseProducer(mapOf( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to "org.apache.kafka.common.serialization.StringSerializer", + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to "io.confluent.kafka.serializers.KafkaAvroSerializer", + ProducerConfig.CLIENT_ID_CONFIG to "membership-producer-app-v1", + AbstractKafkaSchemaSerDeConfig.LATEST_COMPATIBILITY_STRICT to true, + AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION to false, + AbstractKafkaSchemaSerDeConfig.USE_LATEST_WITH_METADATA to "major_version=1" +)) +``` + +This implementation must pass the configuration parameters required to serialize the key and value of events to the `membership-avro` topic. In this use case, the key is a `String` and the value is of type `Membership` - generated from the schema. The value uses the schema-registry-aware `Serializer` implementation `KafkaAvroSerializer`. The underlying serializer is also configured to NOT use the latest version of the schema (`use.latest.version`) and to relax the strict compatibility checks (`latest.compatibility.strict`). For more on these settings, see the CC doc's explanation of [the differences between preregistered and client-derived schemas](https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#handling-differences-between-preregistered-and-client-derived-schemas). + +Let's also implement the `BaseConsumer` class, configured to use this `major_version` of the schema: + +```kotlin +class MembershipConsumer: BaseConsumer(mapOf( + ConsumerConfig.GROUP_ID_CONFIG to "app-schema-v1", + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest", + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to "org.apache.kafka.common.serialization.StringDeserializer", + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to "io.confluent.kafka.serializers.KafkaAvroDeserializer", + AbstractKafkaSchemaSerDeConfig.LATEST_COMPATIBILITY_STRICT to true, + AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION to false, + AbstractKafkaSchemaSerDeConfig.USE_LATEST_WITH_METADATA to "major_version=1" +)) { + + override fun consumeRecord( + key: String, + value: Membership + ) { + logger.info("Received Membership ${key}, ${value}") + } +} +``` + +The `consumeRecord` function is where we would typically start the business logic of actually "consuming" Kafka events. This implementation simply logs the consumed records to the provided `Logger` instance in the superclass. + +To exercise these producer and consumer implementations, we created a `main` function in the `ApplicationMain` class to start a consumer instance and a producer instance periodically send random events to the `membership-avro` topic. Running [this application](./app-schema-v1/src/main/kotlin/io/confluent/devrel/dc/v1/ApplicationMain.kt) will print the events being consumed from Kafka: + +```shell +[Thread-0] INFO io.confluent.devrel.datacontracts.shared.BaseConsumer - Received Membership 8e65d8e2-4ad8-475f-8f74-d724865ddbd8, {"user_id": "8e65d8e2-4ad8-475f-8f74-d724865ddbd8", "start_date": "2022-06-13", "end_date": "2025-03-25"} +``` + +#### Evolving to Version 2 + +The decision is made to refactor the `membership` schema to encapsulate the date fields into a type - `ValidityPeriod` - which can be reused in other event types. + +```avroschema +{ + "type": "record", + "name": "ValidityPeriod", + "fields": [ + {"name": "from", "type": {"type": "int", "logicalType": "date"}}, + {"name": "to", "type": {"type": "int", "logicalType": "date"} + } + ] +} +``` + +The refactored `membership` schema now references this new type: + +```avroschema +{ + "name": "Membership", + "namespace": "io.confluent.devrel", + "type": "record", + "fields": [ + { "name": "user_id", "type": "string" }, + { "name": "validity_period", "type": "io.confluent.devrel.ValidityPeriod" } + ] +} +``` + +When we register version 2 of the `membership-avro-value` subject, use `addLocalReference` to include the `validityPeriod` type: + +```kotlin +schemaRegistry{ + register { + subject(inputSubject = "membership-avro-value", type = "AVRO", file = "$avroSchemaDir/membership_v1.avsc") + .setMetadata("$metadataDir/membership_major_version_1.json") + + subject(inputSubject = "membership-avro-value", type = "AVRO", file = "$avroSchemaDir/membership_v2.avsc") + .addLocalReference("validity-period", "$avroSchemaDir/validity_period.avsc") + .setMetadata("$metadataDir/membership_major_version_2.json") + .setRuleSet("$rulesetDir/membership_migration_rules.json") + } +} +``` + +Download the version 2 schema to the version 2 application and generate the Java classes for this schema: + +```shell +./gradlew :data-contracts:app-schema-v2:generateCode +``` + +In the `app-schema-v2` module, we'll find a new implementation of a `MembershipConsumer`: + +```kotlin +class MembershipConsumer: BaseConsumer(mapOf( + ConsumerConfig.GROUP_ID_CONFIG to "app-schema-v2", + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest", + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to "org.apache.kafka.common.serialization.StringDeserializer", + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to "io.confluent.kafka.serializers.KafkaAvroDeserializer", + AbstractKafkaSchemaSerDeConfig.LATEST_COMPATIBILITY_STRICT to true, + AbstractKafkaSchemaSerDeConfig.USE_LATEST_WITH_METADATA to "major_version=2" +) +) { + override fun consumeRecord(key: String, value: Membership) { + logger.info("v2 - Received Membership ${key}, ${value}") + } +} +``` + +A closer look at the configuration of this consumer shows we are now using `latest.compatibility.strict` and specifying `major_version=2` +in the `use.latest.with.metadata` configuration for the deserializer. + +Running the `main` function in `ApplicationV2Main` will consume the events from the `membership-avro` topic, but with a noticeable difference from the `app-schama-v1` application: + +```shell +[Thread-0] INFO io.confluent.devrel.datacontracts.shared.BaseConsumer - v2 - Received Membership af39d3a4-53b3-4d27-9bf0-3528965b6149, {"user_id": "af39d3a4-53b3-4d27-9bf0-3528965b6149", "validity_period": {"from": "2022-10-29", "to": "2025-08-28"}} +``` + +We're consuming the same events, but the deserialized with version 2 of the schema. + +## Teardown + + +When you're done with the tutorial, issue this command from the `cc-terraform` directory to destroy the Confluent Cloud environment +we created: + +```shell +terraform destroy -auto-approve +``` + +Check the Confluent Cloud console to ensure this environment no longer exists. + + diff --git a/data-contracts/app-schema-v1/build.gradle.kts b/data-contracts/app-schema-v1/build.gradle.kts new file mode 100644 index 00000000..2047cfd3 --- /dev/null +++ b/data-contracts/app-schema-v1/build.gradle.kts @@ -0,0 +1,93 @@ +import java.io.FileInputStream +import java.util.Properties + +buildscript { + repositories { + mavenCentral() + maven("https://packages.confluent.io/maven/") + maven("https://jitpack.io") + gradlePluginPortal() + } +} + +plugins { + kotlin("jvm") version "2.0.21" + id("com.google.protobuf") version "0.9.4" + id("com.github.imflog.kafka-schema-registry-gradle-plugin") version "2.1.0" + id("com.bakdata.avro") version "1.2.1" +} + +group = "io.confluent.devrel" + +repositories { + mavenCentral() + maven("https://packages.confluent.io/maven/") + maven("https://jitpack.io") +} + +sourceSets { + main { + kotlin.srcDirs("src/main/kotlin", "build/generated-main-avro-java") + } +} + +dependencies { + implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.10.1") + implementation(project(":data-contracts:shared")) + + implementation("org.apache.kafka:kafka-clients:${project.property("kafkaVersion")}") + implementation("io.confluent:kafka-avro-serializer:${project.property("confluentVersion")}") + + implementation("io.github.serpro69:kotlin-faker:${project.property("fakerVersion")}") + implementation("io.github.serpro69:kotlin-faker-books:${project.property("fakerVersion")}") + implementation("io.github.serpro69:kotlin-faker-tech:${project.property("fakerVersion")}") + + implementation("org.jetbrains.kotlinx:kotlinx-cli:0.3.6") + implementation("org.jetbrains.kotlinx:kotlinx-datetime:0.6.1") +} + +kotlin { + jvmToolchain(17) +} + +val schemaRegOutputDir = "${project.projectDir.absolutePath}/build/schema-registry-plugin" + +tasks.downloadSchemasTask { + doFirst { + mkdir(schemaRegOutputDir) + } +} + +schemaRegistry { + val srProperties = Properties() + // At the moment, this is a file with which we are LOCALLY aware. + // In an ACTUAL CI/CD workflow, this would be externalized, perhaps provided from a base build image or other parameter. + srProperties.load(FileInputStream(File("${project.projectDir.absolutePath}/../shared/src/main/resources/confluent.properties"))) + + url = srProperties.getProperty("schema.registry.url") + + val srCredTokens = srProperties.get("basic.auth.user.info").toString().split(":") + credentials { + username = srCredTokens[0] + password = srCredTokens[1] + } + outputDirectory = "${System.getProperty("user.home")}/tmp/schema-registry-plugin" + pretty = true + + download { + // download the membership avro schema, version 1 + subject("membership-avro-value", "${projectDir}/src/main/avro", 1) + } +} + +tasks.clean { + doFirst { + delete(fileTree("${projectDir}/src/main/avro/").include("**/*.avsc")) + } +} + +tasks.register("generateCode") { + group = "source generation" + description = "wrapper task for all source generation" + dependsOn("downloadSchemasTask", "generateAvroJava", "generateProto") +} diff --git a/data-contracts/app-schema-v1/gradle.properties b/data-contracts/app-schema-v1/gradle.properties new file mode 100644 index 00000000..3fd23b72 --- /dev/null +++ b/data-contracts/app-schema-v1/gradle.properties @@ -0,0 +1,6 @@ +confluentVersion=7.8.0 +fakerVersion=2.0.0-rc.3 +grpcVersion=1.15.1 +kafkaVersion=3.8.0 +protobufVersion=3.6.1 +slf4jVersion=2.0.11 diff --git a/data-contracts/app-schema-v1/settings.gradle.kts b/data-contracts/app-schema-v1/settings.gradle.kts new file mode 100644 index 00000000..67734339 --- /dev/null +++ b/data-contracts/app-schema-v1/settings.gradle.kts @@ -0,0 +1 @@ +rootProject.name="app-schema-v1" diff --git a/data-contracts/app-schema-v1/src/main/avro/.gitignore b/data-contracts/app-schema-v1/src/main/avro/.gitignore new file mode 100644 index 00000000..c5cab8d4 --- /dev/null +++ b/data-contracts/app-schema-v1/src/main/avro/.gitignore @@ -0,0 +1 @@ +*.avsc \ No newline at end of file diff --git a/data-contracts/app-schema-v1/src/main/kotlin/io/confluent/devrel/dc/v1/ApplicationMain.kt b/data-contracts/app-schema-v1/src/main/kotlin/io/confluent/devrel/dc/v1/ApplicationMain.kt new file mode 100644 index 00000000..4e4a0951 --- /dev/null +++ b/data-contracts/app-schema-v1/src/main/kotlin/io/confluent/devrel/dc/v1/ApplicationMain.kt @@ -0,0 +1,72 @@ +package io.confluent.devrel.dc.v1 + +import io.confluent.devrel.Membership +import io.confluent.devrel.dc.v1.kafka.MembershipConsumer +import io.confluent.devrel.dc.v1.kafka.MembershipProducer +import kotlinx.cli.ArgParser +import kotlinx.cli.ArgType +import kotlinx.cli.default +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import kotlinx.datetime.Clock +import java.time.LocalDate +import java.util.* +import kotlin.concurrent.thread +import kotlin.random.Random +import kotlin.time.DurationUnit +import kotlin.time.toDuration + +class ApplicationMain { + + companion object { + + + @JvmStatic + fun main(args: Array) { + runBlocking { + println("Starting application main...") + println(args.joinToString(" ")) + val parser = ArgParser("schema-v1") + + val interval by parser.option(ArgType.Int, + shortName = "i", fullName = "interval", + description = "message send interval, seconds") + .default(1) + val duration by parser.option(ArgType.Int, + shortName = "d", fullName = "duration", + description = "how long to run, seconds") + .default(100) + parser.parse(args) + + val messageInterval = interval.toDuration(DurationUnit.SECONDS) + val sendDuration = duration.toDuration(DurationUnit.SECONDS) + + val producer = MembershipProducer() + val consumer = MembershipConsumer() + + thread { + consumer.start(listOf("membership-avro")) + } + + coroutineScope { + launch { + val until = Clock.System.now().plus(sendDuration) + while(Clock.System.now().compareTo(until) < 0) { + val userId = UUID.randomUUID().toString() + val membership = Membership.newBuilder() + .setUserId(userId) + .setStartDate(LocalDate.now().minusDays(Random.nextLong(100, 1000))) + .setEndDate(LocalDate.now().plusWeeks(Random.nextLong(1, 52))) + .build() + producer.send("membership-avro", userId, membership) + delay(messageInterval.inWholeSeconds) + } + } + } + producer.close() + } + } + } +} \ No newline at end of file diff --git a/data-contracts/app-schema-v1/src/main/kotlin/io/confluent/devrel/dc/v1/kafka/MembershipConsumer.kt b/data-contracts/app-schema-v1/src/main/kotlin/io/confluent/devrel/dc/v1/kafka/MembershipConsumer.kt new file mode 100644 index 00000000..6777230d --- /dev/null +++ b/data-contracts/app-schema-v1/src/main/kotlin/io/confluent/devrel/dc/v1/kafka/MembershipConsumer.kt @@ -0,0 +1,24 @@ +package io.confluent.devrel.dc.v1.kafka + +import io.confluent.devrel.Membership +import io.confluent.devrel.datacontracts.shared.BaseConsumer +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig +import org.apache.kafka.clients.consumer.ConsumerConfig + +class MembershipConsumer: BaseConsumer(mapOf( + ConsumerConfig.GROUP_ID_CONFIG to "app-schema-v1", + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest", + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to "org.apache.kafka.common.serialization.StringDeserializer", + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to "io.confluent.kafka.serializers.KafkaAvroDeserializer", + AbstractKafkaSchemaSerDeConfig.LATEST_COMPATIBILITY_STRICT to true, + AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION to false, + AbstractKafkaSchemaSerDeConfig.USE_LATEST_WITH_METADATA to "major_version=1" +)) { + + override fun consumeRecord( + key: String, + value: Membership + ) { + logger.info("Received Membership ${key}, ${value}") + } +} \ No newline at end of file diff --git a/data-contracts/app-schema-v1/src/main/kotlin/io/confluent/devrel/dc/v1/kafka/MembershipProducer.kt b/data-contracts/app-schema-v1/src/main/kotlin/io/confluent/devrel/dc/v1/kafka/MembershipProducer.kt new file mode 100644 index 00000000..0aeb53c3 --- /dev/null +++ b/data-contracts/app-schema-v1/src/main/kotlin/io/confluent/devrel/dc/v1/kafka/MembershipProducer.kt @@ -0,0 +1,15 @@ +package io.confluent.devrel.dc.v1.kafka + +import io.confluent.devrel.Membership +import io.confluent.devrel.datacontracts.shared.BaseProducer +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig +import org.apache.kafka.clients.producer.ProducerConfig + +class MembershipProducer: BaseProducer(mapOf( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to "org.apache.kafka.common.serialization.StringSerializer", + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to "io.confluent.kafka.serializers.KafkaAvroSerializer", + ProducerConfig.CLIENT_ID_CONFIG to "membership-producer-app-v1", + AbstractKafkaSchemaSerDeConfig.LATEST_COMPATIBILITY_STRICT to true, + AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION to false, + AbstractKafkaSchemaSerDeConfig.USE_LATEST_WITH_METADATA to "major_version=1" +)) \ No newline at end of file diff --git a/data-contracts/app-schema-v1/src/main/resources/logback.xml b/data-contracts/app-schema-v1/src/main/resources/logback.xml new file mode 100644 index 00000000..b0d9c706 --- /dev/null +++ b/data-contracts/app-schema-v1/src/main/resources/logback.xml @@ -0,0 +1,25 @@ + + + + + + + + %d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n + + + + + + + + + + + + + + + + + diff --git a/data-contracts/app-schema-v2/build.gradle.kts b/data-contracts/app-schema-v2/build.gradle.kts new file mode 100644 index 00000000..a07aa387 --- /dev/null +++ b/data-contracts/app-schema-v2/build.gradle.kts @@ -0,0 +1,93 @@ +import java.io.FileInputStream +import java.util.Properties + +buildscript { + repositories { + mavenCentral() + maven("https://packages.confluent.io/maven/") + maven("https://jitpack.io") + gradlePluginPortal() + } +} + +plugins { + kotlin("jvm") version "2.0.21" + id("com.google.protobuf") version "0.9.4" + id("com.github.imflog.kafka-schema-registry-gradle-plugin") version "2.1.0" + id("com.bakdata.avro") version "1.2.1" +} + +group = "io.confluent.devrel" + +repositories { + mavenCentral() + maven("https://packages.confluent.io/maven/") + maven("https://jitpack.io") +} + +sourceSets { + main { + kotlin.srcDirs("src/main/kotlin", "build/generated-main-avro-java") + } +} + +dependencies { + implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.10.1") + implementation(project(":data-contracts:shared")) + + implementation("org.apache.kafka:kafka-clients:${project.property("kafkaVersion")}") + implementation("io.confluent:kafka-avro-serializer:${project.property("confluentVersion")}") + + implementation("io.github.serpro69:kotlin-faker:${project.property("fakerVersion")}") + implementation("io.github.serpro69:kotlin-faker-books:${project.property("fakerVersion")}") + implementation("io.github.serpro69:kotlin-faker-tech:${project.property("fakerVersion")}") + + implementation("org.jetbrains.kotlinx:kotlinx-cli:0.3.6") + implementation("org.jetbrains.kotlinx:kotlinx-datetime:0.6.1") +} + +kotlin { + jvmToolchain(17) +} + +val schemaRegOutputDir = "${project.projectDir.absolutePath}/build/schema-registry-plugin" + +tasks.downloadSchemasTask { + doFirst { + mkdir(schemaRegOutputDir) + } +} + +schemaRegistry { + val srProperties = Properties() + // At the moment, this is a file with which we are LOCALLY aware. + // In an ACTUAL CI/CD workflow, this would be externalized, perhaps provided from a base build image or other parameter. + srProperties.load(FileInputStream(File("${project.projectDir.absolutePath}/../shared/src/main/resources/confluent.properties"))) + + url = srProperties.getProperty("schema.registry.url") + + val srCredTokens = srProperties.get("basic.auth.user.info").toString().split(":") + credentials { + username = srCredTokens[0] + password = srCredTokens[1] + } + outputDirectory = "${System.getProperty("user.home")}/tmp/schema-registry-plugin" + pretty = true + + download { + // download the membership avro schema, version 2 + subject("membership-avro-value", "${projectDir}/src/main/avro", 2) + } +} + +tasks.clean { + doFirst { + delete(fileTree("${projectDir}/src/main/avro/").include("**/*.avsc")) + } +} + +tasks.register("generateCode") { + group = "source generation" + description = "wrapper task for all source generation" + dependsOn("downloadSchemasTask", "generateAvroJava", "generateProto") +} diff --git a/data-contracts/app-schema-v2/gradle.properties b/data-contracts/app-schema-v2/gradle.properties new file mode 100644 index 00000000..3fd23b72 --- /dev/null +++ b/data-contracts/app-schema-v2/gradle.properties @@ -0,0 +1,6 @@ +confluentVersion=7.8.0 +fakerVersion=2.0.0-rc.3 +grpcVersion=1.15.1 +kafkaVersion=3.8.0 +protobufVersion=3.6.1 +slf4jVersion=2.0.11 diff --git a/data-contracts/app-schema-v2/settings.gradle.kts b/data-contracts/app-schema-v2/settings.gradle.kts new file mode 100644 index 00000000..f3f35313 --- /dev/null +++ b/data-contracts/app-schema-v2/settings.gradle.kts @@ -0,0 +1 @@ +rootProject.name="app-schema-v2" diff --git a/data-contracts/app-schema-v2/src/main/avro/.gitignore b/data-contracts/app-schema-v2/src/main/avro/.gitignore new file mode 100644 index 00000000..c5cab8d4 --- /dev/null +++ b/data-contracts/app-schema-v2/src/main/avro/.gitignore @@ -0,0 +1 @@ +*.avsc \ No newline at end of file diff --git a/data-contracts/app-schema-v2/src/main/kotlin/io/confluent/devrel/v2/ApplicationV2Main.kt b/data-contracts/app-schema-v2/src/main/kotlin/io/confluent/devrel/v2/ApplicationV2Main.kt new file mode 100644 index 00000000..32ac74fd --- /dev/null +++ b/data-contracts/app-schema-v2/src/main/kotlin/io/confluent/devrel/v2/ApplicationV2Main.kt @@ -0,0 +1,23 @@ +package io.confluent.devrel.v2 + +import io.confluent.devrel.v2.kafka.MembershipConsumer +import kotlinx.coroutines.runBlocking +import kotlin.concurrent.thread + +class ApplicationV2Main { + + companion object { + @JvmStatic + fun main(args: Array) { + runBlocking { + println("Starting application main...") + println(args.joinToString(" ")) + val consumer = MembershipConsumer() + + thread { + consumer.start(listOf("membership-avro")) + } + } + } + } +} \ No newline at end of file diff --git a/data-contracts/app-schema-v2/src/main/kotlin/io/confluent/devrel/v2/kafka/MembershipConsumer.kt b/data-contracts/app-schema-v2/src/main/kotlin/io/confluent/devrel/v2/kafka/MembershipConsumer.kt new file mode 100644 index 00000000..0c19a46b --- /dev/null +++ b/data-contracts/app-schema-v2/src/main/kotlin/io/confluent/devrel/v2/kafka/MembershipConsumer.kt @@ -0,0 +1,20 @@ +package io.confluent.devrel.v2.kafka + +import io.confluent.devrel.Membership +import io.confluent.devrel.datacontracts.shared.BaseConsumer +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig +import org.apache.kafka.clients.consumer.ConsumerConfig + +class MembershipConsumer: BaseConsumer(mapOf( + ConsumerConfig.GROUP_ID_CONFIG to "app-schema-v2", + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest", + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to "org.apache.kafka.common.serialization.StringDeserializer", + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to "io.confluent.kafka.serializers.KafkaAvroDeserializer", + AbstractKafkaSchemaSerDeConfig.LATEST_COMPATIBILITY_STRICT to true, + AbstractKafkaSchemaSerDeConfig.USE_LATEST_WITH_METADATA to "major_version=2" +) +) { + override fun consumeRecord(key: String, value: Membership) { + logger.info("v2 - Received Membership ${key}, ${value}") + } +} \ No newline at end of file diff --git a/data-contracts/cc-terraform/.gitignore b/data-contracts/cc-terraform/.gitignore new file mode 100644 index 00000000..afd278b7 --- /dev/null +++ b/data-contracts/cc-terraform/.gitignore @@ -0,0 +1,6 @@ +.terraform.lock.hcl +.terraform/ +.tfplan +tfplan +terraform.tfstate +terraform.tfstate.backup \ No newline at end of file diff --git a/data-contracts/cc-terraform/kafka.tf b/data-contracts/cc-terraform/kafka.tf new file mode 100644 index 00000000..5384a8f8 --- /dev/null +++ b/data-contracts/cc-terraform/kafka.tf @@ -0,0 +1,127 @@ +# Update the config to use a cloud provider and region of your choice. +# https://registry.terraform.io/providers/confluentinc/confluent/latest/docs/resources/confluent_kafka_cluster +resource "confluent_kafka_cluster" "kafka_cluster" { + display_name = var.cc_cluster_name + availability = "SINGLE_ZONE" + cloud = var.cloud_provider + region = var.cloud_region + basic {} + environment { + id = confluent_environment.cc_env.id + } + + depends_on = [confluent_environment.cc_env] +} + +data "confluent_schema_registry_cluster" "advanced" { + environment { + id = confluent_environment.cc_env.id + } + depends_on = [confluent_kafka_cluster.kafka_cluster] +} + +// 'app-manager' service account is required in this configuration to create 'purchase' topic and grant ACLs +// to 'app-producer' and 'app-consumer' service accounts. +resource "confluent_service_account" "app-manager" { + display_name = "${var.cc_cluster_name}-app-manager" + description = "Service account to manage Kafka cluster" +} + +resource "confluent_role_binding" "app-manager-kafka-cluster-admin" { + principal = "User:${confluent_service_account.app-manager.id}" + role_name = "CloudClusterAdmin" + crn_pattern = confluent_kafka_cluster.kafka_cluster.rbac_crn +} + +resource "confluent_api_key" "app-manager-kafka-api-key" { + display_name = "app-manager-kafka-api-key" + description = "Kafka API Key that is owned by 'app-manager' service account" + owner { + id = confluent_service_account.app-manager.id + api_version = confluent_service_account.app-manager.api_version + kind = confluent_service_account.app-manager.kind + } + + managed_resource { + id = confluent_kafka_cluster.kafka_cluster.id + api_version = confluent_kafka_cluster.kafka_cluster.api_version + kind = confluent_kafka_cluster.kafka_cluster.kind + + environment { + id = confluent_environment.cc_env.id + } + } + + # The goal is to ensure that confluent_role_binding.app-manager-kafka-cluster-admin is created before + # confluent_api_key.app-manager-kafka-api-key is used to create instances of + # confluent_kafka_topic, confluent_kafka_acl resources. + + # 'depends_on' meta-argument is specified in confluent_api_key.app-manager-kafka-api-key to avoid having + # multiple copies of this definition in the configuration which would happen if we specify it in + # confluent_kafka_topic, confluent_kafka_acl resources instead. + depends_on = [ + confluent_role_binding.app-manager-kafka-cluster-admin + ] +} + +resource "confluent_service_account" "env-manager" { + display_name = "${var.cc_cluster_name}-env-manager" + description = "Service account to manage 'Staging' environment" +} + +resource "confluent_role_binding" "env-manager-environment-admin" { + principal = "User:${confluent_service_account.env-manager.id}" + role_name = "EnvironmentAdmin" + crn_pattern = confluent_environment.cc_env.resource_name +} + +resource "confluent_api_key" "env-manager-schema-registry-api-key" { + display_name = "env-manager-schema-registry-api-key" + description = "Schema Registry API Key that is owned by 'env-manager' service account" + owner { + id = confluent_service_account.env-manager.id + api_version = confluent_service_account.env-manager.api_version + kind = confluent_service_account.env-manager.kind + } + + managed_resource { + id = data.confluent_schema_registry_cluster.advanced.id + api_version = data.confluent_schema_registry_cluster.advanced.api_version + kind = data.confluent_schema_registry_cluster.advanced.kind + + environment { + id = confluent_environment.cc_env.id + } + } + + # The goal is to ensure that confluent_role_binding.env-manager-environment-admin is created before + # confluent_api_key.env-manager-schema-registry-api-key is used to create instances of + # confluent_schema resources. + + # 'depends_on' meta-argument is specified in confluent_api_key.env-manager-schema-registry-api-key to avoid having + # multiple copies of this definition in the configuration which would happen if we specify it in + # confluent_schema resources instead. + depends_on = [ + confluent_role_binding.env-manager-environment-admin, + data.confluent_schema_registry_cluster.advanced + ] +} + +resource "confluent_schema_registry_cluster_config" "schema_registry_cluster_config" { + schema_registry_cluster { + id = data.confluent_schema_registry_cluster.advanced.id + } + rest_endpoint = data.confluent_schema_registry_cluster.advanced.rest_endpoint + compatibility_level = "BACKWARD" + credentials { + key = confluent_api_key.env-manager-schema-registry-api-key.id + secret = confluent_api_key.env-manager-schema-registry-api-key.secret + } + + depends_on = [data.confluent_schema_registry_cluster.advanced, + confluent_api_key.env-manager-schema-registry-api-key] + + lifecycle { + prevent_destroy = false + } +} \ No newline at end of file diff --git a/data-contracts/cc-terraform/main.tf b/data-contracts/cc-terraform/main.tf new file mode 100644 index 00000000..9ef84610 --- /dev/null +++ b/data-contracts/cc-terraform/main.tf @@ -0,0 +1,28 @@ +# Configure the Confluent Provider +terraform { + backend "local" { + workspace_dir = ".tfstate/terraform.state" + } + + required_providers { + confluent = { + source = "confluentinc/confluent" + version = "2.12.0" + } + } +} + +provider "confluent" { +} + +resource "confluent_environment" "cc_env" { + display_name = var.cc_env_display_name + + stream_governance { + package = "ADVANCED" + } + + lifecycle { + prevent_destroy = false + } +} diff --git a/data-contracts/cc-terraform/membership.tf b/data-contracts/cc-terraform/membership.tf new file mode 100644 index 00000000..46676d44 --- /dev/null +++ b/data-contracts/cc-terraform/membership.tf @@ -0,0 +1,40 @@ +resource "confluent_kafka_topic" "membership_avro" { + + topic_name = "membership-avro" + + kafka_cluster { + id = confluent_kafka_cluster.kafka_cluster.id + } + rest_endpoint = confluent_kafka_cluster.kafka_cluster.rest_endpoint + credentials { + key = confluent_api_key.app-manager-kafka-api-key.id + secret = confluent_api_key.app-manager-kafka-api-key.secret + } + + partitions_count = 10 + + lifecycle { + prevent_destroy = false + } +} + +resource "confluent_subject_config" "membership_value_avro" { + subject_name = "${confluent_kafka_topic.membership_avro.topic_name}-value" + + schema_registry_cluster { + id = data.confluent_schema_registry_cluster.advanced.id + } + rest_endpoint = data.confluent_schema_registry_cluster.advanced.rest_endpoint + + credentials { + key = confluent_api_key.env-manager-schema-registry-api-key.id + secret = confluent_api_key.env-manager-schema-registry-api-key.secret + } + + compatibility_level = "BACKWARD" + compatibility_group = "major_version" + + lifecycle { + prevent_destroy = false + } +} diff --git a/data-contracts/cc-terraform/outputs.tf b/data-contracts/cc-terraform/outputs.tf new file mode 100644 index 00000000..889caaf1 --- /dev/null +++ b/data-contracts/cc-terraform/outputs.tf @@ -0,0 +1,35 @@ +output "bootstrap_servers" { + value = replace(confluent_kafka_cluster.kafka_cluster.bootstrap_endpoint, "SASL_SSL://", "") +} + +output "security_protocol" { + value = "SASL_SSL" +} + +output "sasl_mechanism" { + value = "PLAIN" +} + +output "sasl_jaas_config" { + value = "org.apache.kafka.common.security.plain.PlainLoginModule required username='${confluent_api_key.app-manager-kafka-api-key.id}' password='${nonsensitive(confluent_api_key.app-manager-kafka-api-key.secret)}';" +} + +output "client_dns_lookup" { + value = "use_all_dns_ips" +} + +output "schema_registry_url" { + value = data.confluent_schema_registry_cluster.advanced.rest_endpoint +} + +output "basic_auth_credentials_source" { + value = "USER_INFO" +} + +output "basic_auth_user_info" { + value = "${confluent_api_key.env-manager-schema-registry-api-key.id}:${nonsensitive(confluent_api_key.env-manager-schema-registry-api-key.secret)}" +} + +output "auto_register_schemas" { + value = false +} diff --git a/data-contracts/cc-terraform/variables.tf b/data-contracts/cc-terraform/variables.tf new file mode 100644 index 00000000..33e386bc --- /dev/null +++ b/data-contracts/cc-terraform/variables.tf @@ -0,0 +1,27 @@ +variable "cloud_provider" { + type = string + description = "cloud provider for Confluent Cloud" + default = "AWS" +} + +variable "cloud_region" { + type = string + description = "cloud provider region" + default = "us-east-2" +} + +variable "cc_cluster_name" { + type = string + description = "name of kafka cluster" + default = "data-contracts-with-tf" +} + +variable "org_id" { + type = string +} + +variable "cc_env_display_name" { + type = string + description = "Name of Confluent Cloud Environment to Manage" + default = "tutorials-data-contracts-with-tf" +} diff --git a/data-contracts/images/overview.png b/data-contracts/images/overview.png new file mode 100644 index 00000000..41843109 Binary files /dev/null and b/data-contracts/images/overview.png differ diff --git a/data-contracts/schemas/build.gradle.kts b/data-contracts/schemas/build.gradle.kts new file mode 100644 index 00000000..391c7b19 --- /dev/null +++ b/data-contracts/schemas/build.gradle.kts @@ -0,0 +1,66 @@ +import java.io.FileInputStream +import java.util.Properties + +buildscript { + repositories { + mavenCentral() + maven("https://packages.confluent.io/maven/") + maven("https://jitpack.io") + gradlePluginPortal() + } +} + +plugins { + kotlin("jvm") version "2.0.21" + id("com.google.protobuf") version "0.9.4" + id("com.github.imflog.kafka-schema-registry-gradle-plugin") version "2.1.0" + id("com.bakdata.avro") version "1.2.1" +} + +group = "io.confluent.devrel" + +repositories { + mavenCentral() + maven("https://packages.confluent.io/maven/") + maven("https://jitpack.io") +} + +val schemaRegOutputDir = "${project.projectDir.absolutePath}/build/schema-registry-plugin" + +tasks.registerSchemasTask { + doFirst { + mkdir(schemaRegOutputDir) + } +} + +schemaRegistry { + val srProperties = Properties() + // At the moment, this is a file with which we are LOCALLY aware. + // In an ACTUAL CI/CD workflow, this would be externalized, perhaps provided from a base build image or other parameter. + srProperties.load(FileInputStream(File("${project.projectDir.absolutePath}/../shared/src/main/resources/confluent.properties"))) + + url = srProperties.getProperty("schema.registry.url") + + val srCredTokens = srProperties.get("basic.auth.user.info").toString().split(":") + credentials { + username = srCredTokens[0] + password = srCredTokens[1] + } + + outputDirectory = schemaRegOutputDir + pretty = true + + val baseBuildDir = "${project.projectDir}/src/main" + val avroSchemaDir = "$baseBuildDir/avro" + val rulesetDir = "$baseBuildDir/rulesets" + val metadataDir = "$baseBuildDir/metadata" + + register { + subject(inputSubject = "membership-avro-value", type = "AVRO", file = "$avroSchemaDir/membership_v1.avsc") + .setMetadata("$metadataDir/membership_major_version_1.json") + subject(inputSubject = "membership-avro-value", type = "AVRO", file = "$avroSchemaDir/membership_v2.avsc") + .addLocalReference("validity-period", "$avroSchemaDir/validity_period.avsc") + .setMetadata("$metadataDir/membership_major_version_2.json") + .setRuleSet("$rulesetDir/membership_migration_rules.json") + } +} diff --git a/data-contracts/schemas/gradle.properties b/data-contracts/schemas/gradle.properties new file mode 100644 index 00000000..3fd23b72 --- /dev/null +++ b/data-contracts/schemas/gradle.properties @@ -0,0 +1,6 @@ +confluentVersion=7.8.0 +fakerVersion=2.0.0-rc.3 +grpcVersion=1.15.1 +kafkaVersion=3.8.0 +protobufVersion=3.6.1 +slf4jVersion=2.0.11 diff --git a/data-contracts/schemas/settings.gradle.kts b/data-contracts/schemas/settings.gradle.kts new file mode 100644 index 00000000..529990ae --- /dev/null +++ b/data-contracts/schemas/settings.gradle.kts @@ -0,0 +1 @@ +rootProject.name="schemas" diff --git a/data-contracts/schemas/src/main/avro/membership_v1.avsc b/data-contracts/schemas/src/main/avro/membership_v1.avsc new file mode 100644 index 00000000..7768156e --- /dev/null +++ b/data-contracts/schemas/src/main/avro/membership_v1.avsc @@ -0,0 +1,10 @@ +{ + "name": "Membership", + "namespace": "io.confluent.devrel", + "type": "record", + "fields": [ + {"name": "user_id", "type": "string"}, + {"name": "start_date", "type": {"type": "int", "logicalType": "date"}}, + {"name": "end_date", "type": {"type": "int", "logicalType": "date"}} + ] +} \ No newline at end of file diff --git a/data-contracts/schemas/src/main/avro/membership_v2.avsc b/data-contracts/schemas/src/main/avro/membership_v2.avsc new file mode 100644 index 00000000..e347b255 --- /dev/null +++ b/data-contracts/schemas/src/main/avro/membership_v2.avsc @@ -0,0 +1,9 @@ +{ + "name": "Membership", + "namespace": "io.confluent.devrel", + "type": "record", + "fields": [ + { "name": "user_id", "type": "string" }, + { "name": "validity_period", "type": "io.confluent.devrel.ValidityPeriod" } + ] +} \ No newline at end of file diff --git a/data-contracts/schemas/src/main/avro/validity_period.avsc b/data-contracts/schemas/src/main/avro/validity_period.avsc new file mode 100644 index 00000000..4f3f3f88 --- /dev/null +++ b/data-contracts/schemas/src/main/avro/validity_period.avsc @@ -0,0 +1,10 @@ +{ + "name": "ValidityPeriod", + "namespace": "io.confluent.devrel", + "type": "record", + "fields": [ + {"name": "from", "type": {"type": "int", "logicalType": "date"}}, + {"name": "to", "type": {"type": "int", "logicalType": "date"} + } + ] +} \ No newline at end of file diff --git a/data-contracts/schemas/src/main/metadata/membership_major_version_1.json b/data-contracts/schemas/src/main/metadata/membership_major_version_1.json new file mode 100644 index 00000000..c546283b --- /dev/null +++ b/data-contracts/schemas/src/main/metadata/membership_major_version_1.json @@ -0,0 +1,5 @@ +{ + "properties": { + "major_version": 1 + } +} \ No newline at end of file diff --git a/data-contracts/schemas/src/main/metadata/membership_major_version_2.json b/data-contracts/schemas/src/main/metadata/membership_major_version_2.json new file mode 100644 index 00000000..8c043e30 --- /dev/null +++ b/data-contracts/schemas/src/main/metadata/membership_major_version_2.json @@ -0,0 +1,5 @@ +{ + "properties": { + "major_version": 2 + } +} \ No newline at end of file diff --git a/data-contracts/schemas/src/main/proto/.gitkeep b/data-contracts/schemas/src/main/proto/.gitkeep new file mode 100644 index 00000000..e69de29b diff --git a/data-contracts/schemas/src/main/rulesets/membership_migration_rules.json b/data-contracts/schemas/src/main/rulesets/membership_migration_rules.json new file mode 100644 index 00000000..1b55ad22 --- /dev/null +++ b/data-contracts/schemas/src/main/rulesets/membership_migration_rules.json @@ -0,0 +1,19 @@ +{ + "migrationRules": [ + { + "name": "move_start_and_end_date_to_validity_period", + "kind": "TRANSFORM", + "type": "JSONATA", + "mode": "UPGRADE", + "expr": "$merge([$sift($, function($v, $k) {$k != 'start_date' and $k != 'end_date'}), {'validity_period': {'from':start_date,'to':end_date}}])" + }, + { + "name": "move_validity_period_to_start_date_and_end_date", + "kind": "TRANSFORM", + "type": "JSONATA", + "mode": "DOWNGRADE", + "expr": "$merge([$sift($, function($v, $k) {$k != 'validity_period'}), {'start_date': validity_period.from, 'end_date': validity_period.to}])" + } + ] +} + diff --git a/data-contracts/shared/build.gradle.kts b/data-contracts/shared/build.gradle.kts new file mode 100644 index 00000000..e06da0cb --- /dev/null +++ b/data-contracts/shared/build.gradle.kts @@ -0,0 +1,48 @@ +buildscript { + repositories { + mavenCentral() + maven("https://packages.confluent.io/maven/") + maven("https://jitpack.io") + gradlePluginPortal() + } +} + +plugins { + kotlin("jvm") version "2.0.21" +} + +group = "io.confluent.devrel" + +repositories { + mavenCentral() + maven("https://packages.confluent.io/maven/") + maven("https://jitpack.io") +} + +dependencies { + implementation("org.slf4j:slf4j-api:${project.property("slf4jVersion")}") + implementation("org.slf4j:slf4j-simple:${project.property("slf4jVersion")}") + implementation("ch.qos.logback:logback-core:1.4.14") + + implementation("io.github.serpro69:kotlin-faker:${project.property("fakerVersion")}") + implementation("io.github.serpro69:kotlin-faker-books:${project.property("fakerVersion")}") + implementation("io.github.serpro69:kotlin-faker-tech:${project.property("fakerVersion")}") + + implementation("org.jetbrains.kotlinx:kotlinx-cli:0.3.6") + implementation("org.jetbrains.kotlinx:kotlinx-datetime:0.6.1") + + implementation("io.grpc:grpc-stub:${project.property("grpcVersion")}") + implementation("io.grpc:grpc-protobuf:${project.property("grpcVersion")}") + implementation("com.google.protobuf:protobuf-java:${project.property("protobufVersion")}") + + implementation("org.apache.kafka:kafka-clients:3.8.0") + implementation("io.confluent:kafka-avro-serializer:${project.property("confluentVersion")}") + implementation("io.confluent:kafka-protobuf-serializer:${project.property("confluentVersion")}") + implementation("io.confluent:kafka-schema-rules:${project.property("confluentVersion")}") + + testImplementation("org.jetbrains.kotlin:kotlin-test") +} + +kotlin { + jvmToolchain(17) +} diff --git a/data-contracts/shared/gradle.properties b/data-contracts/shared/gradle.properties new file mode 100644 index 00000000..3fd23b72 --- /dev/null +++ b/data-contracts/shared/gradle.properties @@ -0,0 +1,6 @@ +confluentVersion=7.8.0 +fakerVersion=2.0.0-rc.3 +grpcVersion=1.15.1 +kafkaVersion=3.8.0 +protobufVersion=3.6.1 +slf4jVersion=2.0.11 diff --git a/data-contracts/shared/settings.gradle.kts b/data-contracts/shared/settings.gradle.kts new file mode 100644 index 00000000..b63b95eb --- /dev/null +++ b/data-contracts/shared/settings.gradle.kts @@ -0,0 +1 @@ +rootProject.name = "shared" \ No newline at end of file diff --git a/data-contracts/shared/src/main/kotlin/io/confluent/devrel/datacontracts/shared/BaseConsumer.kt b/data-contracts/shared/src/main/kotlin/io/confluent/devrel/datacontracts/shared/BaseConsumer.kt new file mode 100644 index 00000000..e738a1ca --- /dev/null +++ b/data-contracts/shared/src/main/kotlin/io/confluent/devrel/datacontracts/shared/BaseConsumer.kt @@ -0,0 +1,40 @@ +package io.confluent.devrel.datacontracts.shared + +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.slf4j.LoggerFactory +import java.time.Duration +import java.util.* + +abstract class BaseConsumer(propOverrides: Map = mapOf()) { + + companion object { + val logger = LoggerFactory.getLogger(BaseConsumer::class.java) + + fun getConsumerProperties(propOverrides: Map): Properties { + val props = Properties() + props.load(this::class.java.classLoader.getResourceAsStream("confluent.properties")) + props.put("specific.avro.reader", "true") + props.putAll(propOverrides) + return props + } + } + + val kafkaConsumer: KafkaConsumer = KafkaConsumer(getConsumerProperties(propOverrides)) + + fun start(topics: List) { + kafkaConsumer.subscribe(topics) + while (true) { + val records = kafkaConsumer.poll(Duration.ofSeconds(5)) + for (record in records) { + logger.trace("Record from ${record.topic()}, ${record.partition()}, ${record.offset()}") + consumeRecord(record.key(), record.value()) + } + } + } + + abstract fun consumeRecord(key: Key, value: Value) + + fun close() { + kafkaConsumer.close() + } +} \ No newline at end of file diff --git a/data-contracts/shared/src/main/kotlin/io/confluent/devrel/datacontracts/shared/BaseProducer.kt b/data-contracts/shared/src/main/kotlin/io/confluent/devrel/datacontracts/shared/BaseProducer.kt new file mode 100644 index 00000000..4c04c45c --- /dev/null +++ b/data-contracts/shared/src/main/kotlin/io/confluent/devrel/datacontracts/shared/BaseProducer.kt @@ -0,0 +1,42 @@ +package io.confluent.devrel.datacontracts.shared + +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.clients.producer.RecordMetadata +import org.slf4j.Logger +import org.slf4j.LoggerFactory +import java.util.* +import java.util.concurrent.Future + +abstract class BaseProducer(propOverrides: Map = mapOf()) { + + companion object { + fun getProducerProperties(propOverrides: Map): Properties { + val props = Properties() + props.load(this::class.java.classLoader.getResourceAsStream("confluent.properties")) + props.putAll(propOverrides) + return props + } + + val logger: Logger = LoggerFactory.getLogger(javaClass) + } + + val kafkaProducer: KafkaProducer = KafkaProducer(getProducerProperties(propOverrides)) + + open fun send(topicName: String, key: Key, value: Value): Future? { + val record = ProducerRecord(topicName, key, value) + return kafkaProducer.send(record) { metadata: RecordMetadata?, exception: Exception? -> + if (exception != null) { + logger.error("Failed to send message: ${exception.message}") + exception.printStackTrace(System.err) + } + else if (metadata != null) { + logger.debug("Message sent successfully! Topic: ${metadata.topic()}, Partition: ${metadata.partition()}, Offset: ${metadata.offset()}") + } + } + } + + fun close() { + kafkaProducer.close() + } +} diff --git a/data-contracts/shared/src/main/kotlin/io/confluent/devrel/datacontracts/shared/ConfigLoader.kt b/data-contracts/shared/src/main/kotlin/io/confluent/devrel/datacontracts/shared/ConfigLoader.kt new file mode 100644 index 00000000..9ed4aa29 --- /dev/null +++ b/data-contracts/shared/src/main/kotlin/io/confluent/devrel/datacontracts/shared/ConfigLoader.kt @@ -0,0 +1,23 @@ +package io.confluent.devrel.datacontracts.shared + +import java.io.File +import java.io.InputStream +import java.util.Properties + +object ConfigLoader { + + fun loadPropsFromFile(path: String): Properties { + val properties = Properties() + val stream: InputStream? = File(path).inputStream() + stream.use { + println("reading properties from $path") + properties.load(stream) + } + + return properties + .mapKeys { it.key.toString() } + .mapValues { it.value.toString().replace("\"", "") } + .toProperties() + } + +} \ No newline at end of file diff --git a/data-contracts/shared/src/main/resources/.gitignore b/data-contracts/shared/src/main/resources/.gitignore new file mode 100644 index 00000000..37ecdf22 --- /dev/null +++ b/data-contracts/shared/src/main/resources/.gitignore @@ -0,0 +1 @@ +confluent.properties \ No newline at end of file diff --git a/data-contracts/shared/src/main/resources/confluent.properties.orig b/data-contracts/shared/src/main/resources/confluent.properties.orig new file mode 100644 index 00000000..7257a098 --- /dev/null +++ b/data-contracts/shared/src/main/resources/confluent.properties.orig @@ -0,0 +1,19 @@ +# Required connection configs for Kafka Streams +bootstrap.servers= + +security.protocol=SASL_SSL +sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='' password=''; +sasl.mechanism=PLAIN +# Required for correctness in Apache Kafka clients prior to 2.6 +client.dns.lookup=use_all_dns_ips + +# Best practice for higher availability in Apache Kafka clients prior to 3.0 +session.timeout.ms=45000 + +# Best practice for Kafka producer to prevent data loss +acks=all + +# Required connection configs for Confluent Cloud Schema Registry +schema.registry.url= +basic.auth.credentials.source=USER_INFO +basic.auth.user.info=: diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 1af9e093..cea7a793 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-8.5-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.12-bin.zip networkTimeout=10000 validateDistributionUrl=true zipStoreBase=GRADLE_USER_HOME diff --git a/gradlew.bat b/gradlew.bat index 93e3f59f..6689b85b 100644 --- a/gradlew.bat +++ b/gradlew.bat @@ -1,92 +1,92 @@ -@rem -@rem Copyright 2015 the original author or authors. -@rem -@rem Licensed under the Apache License, Version 2.0 (the "License"); -@rem you may not use this file except in compliance with the License. -@rem You may obtain a copy of the License at -@rem -@rem https://www.apache.org/licenses/LICENSE-2.0 -@rem -@rem Unless required by applicable law or agreed to in writing, software -@rem distributed under the License is distributed on an "AS IS" BASIS, -@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -@rem See the License for the specific language governing permissions and -@rem limitations under the License. -@rem - -@if "%DEBUG%"=="" @echo off -@rem ########################################################################## -@rem -@rem Gradle startup script for Windows -@rem -@rem ########################################################################## - -@rem Set local scope for the variables with windows NT shell -if "%OS%"=="Windows_NT" setlocal - -set DIRNAME=%~dp0 -if "%DIRNAME%"=="" set DIRNAME=. -@rem This is normally unused -set APP_BASE_NAME=%~n0 -set APP_HOME=%DIRNAME% - -@rem Resolve any "." and ".." in APP_HOME to make it shorter. -for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi - -@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. -set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" - -@rem Find java.exe -if defined JAVA_HOME goto findJavaFromJavaHome - -set JAVA_EXE=java.exe -%JAVA_EXE% -version >NUL 2>&1 -if %ERRORLEVEL% equ 0 goto execute - -echo. -echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. -echo. -echo Please set the JAVA_HOME variable in your environment to match the -echo location of your Java installation. - -goto fail - -:findJavaFromJavaHome -set JAVA_HOME=%JAVA_HOME:"=% -set JAVA_EXE=%JAVA_HOME%/bin/java.exe - -if exist "%JAVA_EXE%" goto execute - -echo. -echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% -echo. -echo Please set the JAVA_HOME variable in your environment to match the -echo location of your Java installation. - -goto fail - -:execute -@rem Setup the command line - -set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar - - -@rem Execute Gradle -"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %* - -:end -@rem End local scope for the variables with windows NT shell -if %ERRORLEVEL% equ 0 goto mainEnd - -:fail -rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of -rem the _cmd.exe /c_ return code! -set EXIT_CODE=%ERRORLEVEL% -if %EXIT_CODE% equ 0 set EXIT_CODE=1 -if not ""=="%GRADLE_EXIT_CONSOLE%" exit %EXIT_CODE% -exit /b %EXIT_CODE% - -:mainEnd -if "%OS%"=="Windows_NT" endlocal - -:omega +@rem +@rem Copyright 2015 the original author or authors. +@rem +@rem Licensed under the Apache License, Version 2.0 (the "License"); +@rem you may not use this file except in compliance with the License. +@rem You may obtain a copy of the License at +@rem +@rem https://www.apache.org/licenses/LICENSE-2.0 +@rem +@rem Unless required by applicable law or agreed to in writing, software +@rem distributed under the License is distributed on an "AS IS" BASIS, +@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@rem See the License for the specific language governing permissions and +@rem limitations under the License. +@rem + +@if "%DEBUG%"=="" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%"=="" set DIRNAME=. +@rem This is normally unused +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Resolve any "." and ".." in APP_HOME to make it shorter. +for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if %ERRORLEVEL% equ 0 goto execute + +echo. +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto execute + +echo. +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %* + +:end +@rem End local scope for the variables with windows NT shell +if %ERRORLEVEL% equ 0 goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +set EXIT_CODE=%ERRORLEVEL% +if %EXIT_CODE% equ 0 set EXIT_CODE=1 +if not ""=="%GRADLE_EXIT_CONSOLE%" exit %EXIT_CODE% +exit /b %EXIT_CODE% + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/over-aggregations/flinksql/build.gradle b/over-aggregations/flinksql/build.gradle index 15d9a151..bcf54025 100644 --- a/over-aggregations/flinksql/build.gradle +++ b/over-aggregations/flinksql/build.gradle @@ -23,8 +23,8 @@ dependencies { testImplementation project(path: ':common', configuration: 'testArtifacts') testImplementation 'com.google.guava:guava:31.1-jre' testImplementation 'junit:junit:4.13.2' - testImplementation 'org.testcontainers:testcontainers:1.19.3' - testImplementation 'org.testcontainers:kafka:1.19.3' + testImplementation 'org.testcontainers:testcontainers:1.20.1' + testImplementation 'org.testcontainers:kafka:1.20.1' testImplementation 'commons-codec:commons-codec:1.17.0' testImplementation 'org.apache.flink:flink-sql-connector-kafka:3.2.0-1.19' testImplementation 'org.apache.flink:flink-connector-base:1.19.1' diff --git a/settings.gradle b/settings.gradle index 94e04309..97be1b29 100644 --- a/settings.gradle +++ b/settings.gradle @@ -21,6 +21,10 @@ include 'confluent-parallel-consumer-application:kafka' include 'common' include 'creating-first-apache-kafka-streams-application:kstreams' include 'cumulating-windows:flinksql' +include 'data-contracts:shared' +include 'data-contracts:schemas' +include 'data-contracts:app-schema-v1' +include 'data-contracts:app-schema-v2' include 'deduplication:flinksql' include 'deduplication-windowed:flinksql' include 'deduplication-windowed:kstreams' diff --git a/udf/ksql/build.gradle b/udf/ksql/build.gradle index d0138dd6..0aeb25f6 100644 --- a/udf/ksql/build.gradle +++ b/udf/ksql/build.gradle @@ -10,8 +10,8 @@ plugins { } java { - sourceCompatibility = JavaVersion.VERSION_11 - targetCompatibility = JavaVersion.VERSION_11 + sourceCompatibility = JavaVersion.VERSION_17 + targetCompatibility = JavaVersion.VERSION_17 }