-
Notifications
You must be signed in to change notification settings - Fork 97
/
SchemaSuite.scala
83 lines (70 loc) · 2.59 KB
/
SchemaSuite.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
/*
* Copyright 2018-2022 OVO Energy Limited
*
* SPDX-License-Identifier: Apache-2.0
*/
package fs2.kafka.vulcan.testkit
import fs2.kafka.vulcan.SchemaRegistryClientSettings
import munit.FunSuite
import vulcan.Codec
import org.apache.avro.SchemaCompatibility
import io.confluent.kafka.schemaregistry.avro.AvroSchema
import cats.effect.IO
import cats.effect.unsafe.implicits.global
import org.apache.avro.Schema
trait CompatibilityChecker[F[_]] {
def checkReaderCompatibility[A](
reader: Codec[A],
writerSubject: String
): F[SchemaCompatibility.SchemaPairCompatibility]
def checkWriterCompatibility[A](
writer: Codec[A],
readerSubject: String
): F[SchemaCompatibility.SchemaPairCompatibility]
}
trait SchemaSuite extends FunSuite {
private def codecAsSchema[A](codec: Codec[A]) = codec.schema.fold(e => fail(e.message), ok => ok)
def compatibilityChecker(
clientSettings: SchemaRegistryClientSettings[IO],
name: String = "schema-compatibility-checker"
) = new Fixture[CompatibilityChecker[IO]](name) {
private var checker: CompatibilityChecker[IO] = null
override def apply(): CompatibilityChecker[IO] = checker
override def beforeAll(): Unit =
checker = clientSettings.createSchemaRegistryClient
.map { client =>
new CompatibilityChecker[IO] {
private def registrySchema(subject: String): IO[Schema] =
for {
metadata <- IO.delay(client.getLatestSchemaMetadata(subject))
schema <- IO.delay(
client.getSchemaById(metadata.getId).asInstanceOf[AvroSchema]
)
} yield schema.rawSchema()
def checkReaderCompatibility[A](
reader: Codec[A],
writerSubject: String
): IO[SchemaCompatibility.SchemaPairCompatibility] = {
val vulcanSchema = codecAsSchema(reader)
registrySchema(writerSubject).map { regSchema =>
SchemaCompatibility.checkReaderWriterCompatibility(
vulcanSchema,
regSchema
)
}
}
def checkWriterCompatibility[A](writer: Codec[A], readerSubject: String)
: IO[SchemaCompatibility.SchemaPairCompatibility] = {
val vulcanSchema = codecAsSchema(writer)
registrySchema(readerSubject).map { regSchema =>
SchemaCompatibility.checkReaderWriterCompatibility(
regSchema,
vulcanSchema
)
}
}
}
}
.unsafeRunSync()
}
}