From e468984c3be82212dfc670fa2eed9ccf5eb3b187 Mon Sep 17 00:00:00 2001 From: Matthew Mihic Date: Wed, 26 Sep 2018 15:55:10 -0400 Subject: [PATCH] Event interfaces --- misk-events/build.gradle | 54 +++++++++++++++++ misk-events/gradle.properties | 4 ++ .../src/main/kotlin/misk/events/Consumer.kt | 21 +++++++ .../src/main/kotlin/misk/events/Event.kt | 58 +++++++++++++++++++ .../src/main/kotlin/misk/events/Producer.kt | 28 +++++++++ .../kotlin/misk/events/SpooledProducer.kt | 17 ++++++ .../src/main/kotlin/misk/events/Topic.kt | 3 + settings.gradle | 1 + 8 files changed, 186 insertions(+) create mode 100644 misk-events/build.gradle create mode 100644 misk-events/gradle.properties create mode 100644 misk-events/src/main/kotlin/misk/events/Consumer.kt create mode 100644 misk-events/src/main/kotlin/misk/events/Event.kt create mode 100644 misk-events/src/main/kotlin/misk/events/Producer.kt create mode 100644 misk-events/src/main/kotlin/misk/events/SpooledProducer.kt create mode 100644 misk-events/src/main/kotlin/misk/events/Topic.kt diff --git a/misk-events/build.gradle b/misk-events/build.gradle new file mode 100644 index 00000000000..591f473a421 --- /dev/null +++ b/misk-events/build.gradle @@ -0,0 +1,54 @@ +import org.junit.platform.console.options.Details + +buildscript { + dependencies { + classpath dep.kotlinNoArgPlugin + } +} + +apply plugin: 'kotlin' +apply plugin: 'kotlin-jpa' +apply plugin: 'org.junit.platform.gradle.plugin' +apply plugin: "com.vanniktech.maven.publish" + +compileKotlin { + kotlinOptions { + jvmTarget = "1.8" + allWarningsAsErrors = true + } +} +compileTestKotlin { + kotlinOptions { + jvmTarget = "1.8" + allWarningsAsErrors = true + } +} + +sourceSets { + main.java.srcDirs += 'src/main/kotlin/' + test.java.srcDirs += 'src/test/kotlin/' +} + +junitPlatform { + details Details.VERBOSE +} + +dependencies { + compile dep.hibernateCore + compile dep.hikariCp + compile dep.hsqldb + compile dep.mysql + compile dep.openTracing + compile dep.openTracingUtil + compile dep.openTracingJdbc + compile dep.vitess + compile project(':misk') + compile project(':misk-hibernate') + + testCompile project(':misk-testing') + testCompile project(':misk-hibernate-testing') +} + +if (rootProject.file("hooks.gradle").exists()) { + apply from: rootProject.file("hooks.gradle") +} diff --git a/misk-events/gradle.properties b/misk-events/gradle.properties new file mode 100644 index 00000000000..e162fb85a56 --- /dev/null +++ b/misk-events/gradle.properties @@ -0,0 +1,4 @@ +POM_ARTIFACT_ID=misk-events +POM_NAME=misk-events +POM_DESCRIPTION=A Misk module for handling event streams +POM_PACKAGING=jar diff --git a/misk-events/src/main/kotlin/misk/events/Consumer.kt b/misk-events/src/main/kotlin/misk/events/Consumer.kt new file mode 100644 index 00000000000..962dcc7ee64 --- /dev/null +++ b/misk-events/src/main/kotlin/misk/events/Consumer.kt @@ -0,0 +1,21 @@ +package misk.events + +/** A [Consumer] allows applications to receive events from a source */ +interface Consumer { + /** The [Context] provides information about a set of events being consumed */ + interface Context { + /** the topic from which events are being received */ + val topic: Topic + + /** Defers processing a set of events to a later time, typically placing them on a retry queue */ + fun retryLater(vararg events: Event) + } + + /** A [Handler] handles incoming events from a topic */ + interface Handler { + fun handleEvents(ctx: Context, vararg events: Event) + } + + /** listens for incoming events to a topic */ + fun subscribe(topic: Topic, handler: Handler) +} \ No newline at end of file diff --git a/misk-events/src/main/kotlin/misk/events/Event.kt b/misk-events/src/main/kotlin/misk/events/Event.kt new file mode 100644 index 00000000000..afcf6afe2d0 --- /dev/null +++ b/misk-events/src/main/kotlin/misk/events/Event.kt @@ -0,0 +1,58 @@ +package misk.events + +import com.squareup.wire.Message +import com.squareup.wire.ProtoAdapter +import okio.ByteString +import okio.ByteString.Companion.encodeUtf8 +import java.time.Instant + +data class Event( + /** The type of event */ + val type: String, + + /** the content so the event, encoded as a protobuf */ + val body: ByteString, + + /** the instant at which the event occurred */ + val occurredAt: Instant, + + /** a global unique id for the event */ + val id: ByteString, + + /** + * The id of the entity to which the event is referencing. Many but not all events + * are correlated with a specific entity; if this event is related to an entity, + * the entity_identifier should specify the id of that entity + */ + val entityIdentifier: String = "", + + /** + * Partitioning key for the event. The partitioning key controls the ordering and sharding + * of events on a topic. Events on a topic with the same partitioning key are delivered on + * the same shard and in the order in which they were published. Typically, the entity + * identifier is also used as a partitioning key, such that all of events on a topic for + * a given entity get delivered in the order in which they were submitted. However, + * producing applications may include an alternate partition key as part of the event + * to support ordering/sharding at a level above the individual entity. For example, + * a card processing system may want to shard and order all credit card changes relative + * to the customer to whom the card belongs; in this case the entity identifier is the credit + * card modified by the event, but the partition key is the token of the customer owning the card. + */ + val partitionKey: ByteString = if (entityIdentifier.isBlank()) id else entityIdentifier.encodeUtf8(), + + /** + * Additional context information about the event, added and examined by infrastructure elements + */ + val headers: Map = mapOf() +) { + fun > bodyAs(adapter: ProtoAdapter): A = adapter.decode(body) + + inline fun > bodyAs(): A = bodyAs(ProtoAdapter.get(A::class.java)) + + fun > header(name: String, adapter: ProtoAdapter): A? = + headers[name]?.let { adapter.decode(it) } + + inline fun > header(name: String) = + header(name, ProtoAdapter.get(A::class.java)) +} + diff --git a/misk-events/src/main/kotlin/misk/events/Producer.kt b/misk-events/src/main/kotlin/misk/events/Producer.kt new file mode 100644 index 00000000000..3156b753f96 --- /dev/null +++ b/misk-events/src/main/kotlin/misk/events/Producer.kt @@ -0,0 +1,28 @@ +package misk.events + +/** + * A [Producer] is used to send events to an event stream. + */ +interface Producer { + /** + * A producer [Transaction] is a unit of publishing. Producer transactions adhere to the + * transactional concepts of isolation (events publishing within a transaction are only + * visible to consumers once the transaction is committed) and atomicity (once committed, all + * events within the transactions are made available to consumers - no events will be lost). + * + * Note that a Producer transaction is _not_ connected to a database transaction; it is solely + * a transaction within the event streaming system. To coordinate event publishing with + * local database transactions, use a [SpooledProducer] which stores the events in a local table + * as part of the local database transaction. + * + * Transactions remaining outstanding until the application calls commit, or rollback, or until + * a producer specific timeout occurs. + */ + interface Transaction { + fun publish(topic: Topic, vararg events: Event) + fun commit() + fun rollback() + } + + fun beginTransaction(): Transaction +} \ No newline at end of file diff --git a/misk-events/src/main/kotlin/misk/events/SpooledProducer.kt b/misk-events/src/main/kotlin/misk/events/SpooledProducer.kt new file mode 100644 index 00000000000..750c5052a22 --- /dev/null +++ b/misk-events/src/main/kotlin/misk/events/SpooledProducer.kt @@ -0,0 +1,17 @@ +package misk.events + +import misk.hibernate.Gid +import misk.hibernate.Session + +/** + * A [SpooledProducer] is a producer that writes events to a local spool stored within a + * service's database. [SpooledProducer]s can be used to coordinate event publishing with + * local database transactions. Events published to the pool are done within the application's + * local database transaction; a rollback of the database transaction will also rollback + * any events published to the spool. Events are asynchronously forwarded from the spool + * to the event stream, and are done so through a [Producer] transaction. + */ +interface SpooledProducer { + fun publish(session: Session, groupRootId: Gid<*, *>, topic: Topic, vararg event: Event) + fun publish(session: Session, topic: Topic, vararg event: Event) +} \ No newline at end of file diff --git a/misk-events/src/main/kotlin/misk/events/Topic.kt b/misk-events/src/main/kotlin/misk/events/Topic.kt new file mode 100644 index 00000000000..5e24bfc5e2c --- /dev/null +++ b/misk-events/src/main/kotlin/misk/events/Topic.kt @@ -0,0 +1,3 @@ +package misk.events + +data class Topic(val name: String) \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index e31ac31812a..281a8dd822d 100644 --- a/settings.gradle +++ b/settings.gradle @@ -10,6 +10,7 @@ include ':misk-prometheus' include ':misk-testing' include ':misk-zipkin' include ':misk-eventrouter' +include ':misk-events' include ':samples:exemplar' include ':samples:exemplarchat' include ':samples:urlshortener'