From ebf9ada1d0fb143944171a76fa1ac471ca80215a Mon Sep 17 00:00:00 2001 From: Jakub Nguyen Date: Thu, 8 Oct 2020 13:49:09 +0200 Subject: [PATCH] initial array type inference inspecting mutliple messages --- .../codefeedr/kafkaquery/parsers/Parser.scala | 12 ++- .../transforms/JsonToAvroSchema.scala | 86 +++++++++---------- .../util/KafkaRecordRetriever.scala | 6 +- .../transforms/JsonToAvroSchemaTest.scala | 4 +- 4 files changed, 55 insertions(+), 53 deletions(-) diff --git a/src/main/scala/org/codefeedr/kafkaquery/parsers/Parser.scala b/src/main/scala/org/codefeedr/kafkaquery/parsers/Parser.scala index 41b05f8..7ed8d75 100644 --- a/src/main/scala/org/codefeedr/kafkaquery/parsers/Parser.scala +++ b/src/main/scala/org/codefeedr/kafkaquery/parsers/Parser.scala @@ -8,7 +8,10 @@ import org.apache.zookeeper.KeeperException import org.codefeedr.kafkaquery.commands.QueryCommand import org.codefeedr.kafkaquery.parsers.Configurations.{Config, Mode} import org.codefeedr.kafkaquery.transforms.JsonToAvroSchema -import org.codefeedr.kafkaquery.util.ZookeeperSchemaExposer +import org.codefeedr.kafkaquery.util.{ + KafkaRecordRetriever, + ZookeeperSchemaExposer +} import scopt.OptionParser class Parser extends OptionParser[Config]("codefeedr") { @@ -225,10 +228,11 @@ class Parser extends OptionParser[Config]("codefeedr") { * @param kafkaAddress address of the kafka instance where the topic is present */ def inferSchema(topicName: String, kafkaAddress: String): Unit = { - val record: String = - JsonToAvroSchema.retrieveLatestRecordFromTopic(topicName, kafkaAddress) + val recordRetriever = new KafkaRecordRetriever(topicName, kafkaAddress) + val record: String = recordRetriever.getNextRecord - val schema = JsonToAvroSchema.inferSchema(record, topicName) + val schema = + JsonToAvroSchema.inferSchema(record, topicName, recordRetriever) updateSchema(topicName, schema.toString) } diff --git a/src/main/scala/org/codefeedr/kafkaquery/transforms/JsonToAvroSchema.scala b/src/main/scala/org/codefeedr/kafkaquery/transforms/JsonToAvroSchema.scala index fe6cc75..3f291d1 100644 --- a/src/main/scala/org/codefeedr/kafkaquery/transforms/JsonToAvroSchema.scala +++ b/src/main/scala/org/codefeedr/kafkaquery/transforms/JsonToAvroSchema.scala @@ -9,6 +9,8 @@ import org.apache.avro.{Schema, SchemaBuilder} import org.apache.avro.SchemaBuilder.TypeBuilder import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.common.TopicPartition +import org.codefeedr.kafkaquery.util.KafkaRecordRetriever + import scala.collection.JavaConverters._ object JsonToAvroSchema { @@ -17,20 +19,27 @@ object JsonToAvroSchema { * Infers an Avro schema from a given JSON object. * @param json json data to infer schema from * @param name name of the data source + * @param retriever a record retriever * @return inferred Avro Schema */ - def inferSchema(json: String, name: String): Schema = { + def inferSchema( + json: String, + name: String, + retriever: KafkaRecordRetriever + ): Schema = { inferSchema( new ObjectMapper().readTree(json), SchemaBuilder.builder(), - name + name, + retriever ) } - private def inferSchema[T]( + def inferSchema[T]( node: JsonNode, schema: TypeBuilder[T], name: String, + retriever: KafkaRecordRetriever, namespace: String = "infer" ): T = node.getNodeType match { @@ -38,19 +47,29 @@ object JsonToAvroSchema { val it = node.iterator() if (!it.hasNext) - throw new IllegalArgumentException( - "Could not infer schema of empty array." - ) + return findArrayType(name, retriever, schema) val nodeName = validName(name) val arrayElemSchema = - inferSchema(it.next(), SchemaBuilder.builder(), nodeName, namespace) + inferSchema( + it.next(), + SchemaBuilder.builder(), + nodeName, + retriever, + namespace + ) it.forEachRemaining(x => if ( !arrayElemSchema.equals( - inferSchema(x, SchemaBuilder.builder(), nodeName, namespace) + inferSchema( + x, + SchemaBuilder.builder(), + nodeName, + retriever, + namespace + ) ) ) throw new IllegalArgumentException( @@ -73,6 +92,7 @@ object JsonToAvroSchema { x.getValue, SchemaBuilder.builder(), fieldName, + retriever, namespace + '.' + nodeName ) ) @@ -90,45 +110,19 @@ object JsonToAvroSchema { case JsonNodeType.NULL | JsonNodeType.MISSING => schema.nullType() } - /** - * Gets the latest record from the specified topic. - * @param topicName name of the topic to fetch record from - * @param kafkaAddress address of Kafka instance - * @return string with the value of the last record - */ - def retrieveLatestRecordFromTopic( - topicName: String, - kafkaAddress: String - ): String = { - val props = new Properties() - props.put("bootstrap.servers", kafkaAddress) - props.put( - "key.deserializer", - "org.apache.kafka.common.serialization.StringDeserializer" - ) - props.put( - "value.deserializer", - "org.apache.kafka.common.serialization.StringDeserializer" + private def findArrayType[T]( + arrayName: String, + retriever: KafkaRecordRetriever, + schema: TypeBuilder[T] + ): T = { + val recordString = retriever.getNextRecord + val nextRecordNode = new ObjectMapper().readTree(recordString) + inferSchema( + nextRecordNode.findPath(arrayName), + schema, + arrayName, + retriever ) - - val kafkaConsumer = new KafkaConsumer[String, String](props) - - val partitions = kafkaConsumer - .partitionsFor(topicName) - .asScala - .map(x => new TopicPartition(x.topic(), x.partition())) - - kafkaConsumer.assign(partitions.asJava) - - kafkaConsumer.seekToEnd(List().asJava) - - val latestPartAndPos = - partitions.map(x => (x, kafkaConsumer.position(x))).maxBy(_._2) - kafkaConsumer.seek(latestPartAndPos._1, latestPartAndPos._2 - 1) - - val records = kafkaConsumer.poll(Duration.ofMillis(100)) - - records.iterator().next().value() } /** diff --git a/src/main/scala/org/codefeedr/kafkaquery/util/KafkaRecordRetriever.scala b/src/main/scala/org/codefeedr/kafkaquery/util/KafkaRecordRetriever.scala index 92d7013..21ed8b6 100644 --- a/src/main/scala/org/codefeedr/kafkaquery/util/KafkaRecordRetriever.scala +++ b/src/main/scala/org/codefeedr/kafkaquery/util/KafkaRecordRetriever.scala @@ -41,6 +41,10 @@ class KafkaRecordRetriever(topicName: String, kafkaAddress: String) { private val partBegOffsetMap = kafkaConsumer.beginningOffsets(partitions).asScala + /** + * Retrieves the next record from the inverse order. + * @return the next record that has not been retrieved yet + */ def getNextRecord: String = { val maxOffsetKey = @@ -56,7 +60,7 @@ class KafkaRecordRetriever(topicName: String, kafkaAddress: String) { kafkaConsumer.seek(maxOffsetKey, partEndOffsetMap(maxOffsetKey)) val records: ConsumerRecords[String, String] = - kafkaConsumer.poll(Duration.ofMillis(100)) + kafkaConsumer.poll(Duration.ofMillis(2000)) records.iterator().next().value() } diff --git a/src/test/scala/org/codefeedr/kafkaquery/transforms/JsonToAvroSchemaTest.scala b/src/test/scala/org/codefeedr/kafkaquery/transforms/JsonToAvroSchemaTest.scala index e63c8d8..d2d3611 100644 --- a/src/test/scala/org/codefeedr/kafkaquery/transforms/JsonToAvroSchemaTest.scala +++ b/src/test/scala/org/codefeedr/kafkaquery/transforms/JsonToAvroSchemaTest.scala @@ -215,7 +215,7 @@ class JsonToAvroSchemaTest extends AnyFunSuite with TableDrivenPropertyChecks { */ forAll(testData) { (avroSchema: String, jsonSample: String) => assertResult(new Schema.Parser().parse(avroSchema)) { - JsonToAvroSchema.inferSchema(jsonSample, topicName) + JsonToAvroSchema.inferSchema(jsonSample, topicName, null) } } @@ -244,7 +244,7 @@ class JsonToAvroSchemaTest extends AnyFunSuite with TableDrivenPropertyChecks { */ forAll(exceptionalTestData) {jsonSample: String => assertThrows[IllegalArgumentException] { - JsonToAvroSchema.inferSchema(jsonSample, topicName) + JsonToAvroSchema.inferSchema(jsonSample, topicName, null) } } }