Skip to content

Commit

Permalink
initial array type inference inspecting mutliple messages
Browse files Browse the repository at this point in the history
  • Loading branch information
jakub014 committed Oct 8, 2020
1 parent 4ec1913 commit ebf9ada
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 53 deletions.
12 changes: 8 additions & 4 deletions src/main/scala/org/codefeedr/kafkaquery/parsers/Parser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ import org.apache.zookeeper.KeeperException
import org.codefeedr.kafkaquery.commands.QueryCommand
import org.codefeedr.kafkaquery.parsers.Configurations.{Config, Mode}
import org.codefeedr.kafkaquery.transforms.JsonToAvroSchema
import org.codefeedr.kafkaquery.util.ZookeeperSchemaExposer
import org.codefeedr.kafkaquery.util.{
KafkaRecordRetriever,
ZookeeperSchemaExposer
}
import scopt.OptionParser

class Parser extends OptionParser[Config]("codefeedr") {
Expand Down Expand Up @@ -225,10 +228,11 @@ class Parser extends OptionParser[Config]("codefeedr") {
* @param kafkaAddress address of the kafka instance where the topic is present
*/
def inferSchema(topicName: String, kafkaAddress: String): Unit = {
val record: String =
JsonToAvroSchema.retrieveLatestRecordFromTopic(topicName, kafkaAddress)
val recordRetriever = new KafkaRecordRetriever(topicName, kafkaAddress)
val record: String = recordRetriever.getNextRecord

val schema = JsonToAvroSchema.inferSchema(record, topicName)
val schema =
JsonToAvroSchema.inferSchema(record, topicName, recordRetriever)

updateSchema(topicName, schema.toString)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import org.apache.avro.{Schema, SchemaBuilder}
import org.apache.avro.SchemaBuilder.TypeBuilder
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.TopicPartition
import org.codefeedr.kafkaquery.util.KafkaRecordRetriever

import scala.collection.JavaConverters._

object JsonToAvroSchema {
Expand All @@ -17,40 +19,57 @@ object JsonToAvroSchema {
* Infers an Avro schema from a given JSON object.
* @param json json data to infer schema from
* @param name name of the data source
* @param retriever a record retriever
* @return inferred Avro Schema
*/
def inferSchema(json: String, name: String): Schema = {
def inferSchema(
json: String,
name: String,
retriever: KafkaRecordRetriever
): Schema = {
inferSchema(
new ObjectMapper().readTree(json),
SchemaBuilder.builder(),
name
name,
retriever
)
}

private def inferSchema[T](
def inferSchema[T](
node: JsonNode,
schema: TypeBuilder[T],
name: String,
retriever: KafkaRecordRetriever,
namespace: String = "infer"
): T =
node.getNodeType match {
case JsonNodeType.ARRAY =>
val it = node.iterator()

if (!it.hasNext)
throw new IllegalArgumentException(
"Could not infer schema of empty array."
)
return findArrayType(name, retriever, schema)

val nodeName = validName(name)

val arrayElemSchema =
inferSchema(it.next(), SchemaBuilder.builder(), nodeName, namespace)
inferSchema(
it.next(),
SchemaBuilder.builder(),
nodeName,
retriever,
namespace
)

it.forEachRemaining(x =>
if (
!arrayElemSchema.equals(
inferSchema(x, SchemaBuilder.builder(), nodeName, namespace)
inferSchema(
x,
SchemaBuilder.builder(),
nodeName,
retriever,
namespace
)
)
)
throw new IllegalArgumentException(
Expand All @@ -73,6 +92,7 @@ object JsonToAvroSchema {
x.getValue,
SchemaBuilder.builder(),
fieldName,
retriever,
namespace + '.' + nodeName
)
)
Expand All @@ -90,45 +110,19 @@ object JsonToAvroSchema {
case JsonNodeType.NULL | JsonNodeType.MISSING => schema.nullType()
}

/**
* Gets the latest record from the specified topic.
* @param topicName name of the topic to fetch record from
* @param kafkaAddress address of Kafka instance
* @return string with the value of the last record
*/
def retrieveLatestRecordFromTopic(
topicName: String,
kafkaAddress: String
): String = {
val props = new Properties()
props.put("bootstrap.servers", kafkaAddress)
props.put(
"key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer"
)
props.put(
"value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer"
private def findArrayType[T](
arrayName: String,
retriever: KafkaRecordRetriever,
schema: TypeBuilder[T]
): T = {
val recordString = retriever.getNextRecord
val nextRecordNode = new ObjectMapper().readTree(recordString)
inferSchema(
nextRecordNode.findPath(arrayName),
schema,
arrayName,
retriever
)

val kafkaConsumer = new KafkaConsumer[String, String](props)

val partitions = kafkaConsumer
.partitionsFor(topicName)
.asScala
.map(x => new TopicPartition(x.topic(), x.partition()))

kafkaConsumer.assign(partitions.asJava)

kafkaConsumer.seekToEnd(List().asJava)

val latestPartAndPos =
partitions.map(x => (x, kafkaConsumer.position(x))).maxBy(_._2)
kafkaConsumer.seek(latestPartAndPos._1, latestPartAndPos._2 - 1)

val records = kafkaConsumer.poll(Duration.ofMillis(100))

records.iterator().next().value()
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ class KafkaRecordRetriever(topicName: String, kafkaAddress: String) {
private val partBegOffsetMap =
kafkaConsumer.beginningOffsets(partitions).asScala

/**
* Retrieves the next record from the inverse order.
* @return the next record that has not been retrieved yet
*/
def getNextRecord: String = {

val maxOffsetKey =
Expand All @@ -56,7 +60,7 @@ class KafkaRecordRetriever(topicName: String, kafkaAddress: String) {
kafkaConsumer.seek(maxOffsetKey, partEndOffsetMap(maxOffsetKey))

val records: ConsumerRecords[String, String] =
kafkaConsumer.poll(Duration.ofMillis(100))
kafkaConsumer.poll(Duration.ofMillis(2000))

records.iterator().next().value()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ class JsonToAvroSchemaTest extends AnyFunSuite with TableDrivenPropertyChecks {
*/
forAll(testData) { (avroSchema: String, jsonSample: String) =>
assertResult(new Schema.Parser().parse(avroSchema)) {
JsonToAvroSchema.inferSchema(jsonSample, topicName)
JsonToAvroSchema.inferSchema(jsonSample, topicName, null)
}
}

Expand Down Expand Up @@ -244,7 +244,7 @@ class JsonToAvroSchemaTest extends AnyFunSuite with TableDrivenPropertyChecks {
*/
forAll(exceptionalTestData) {jsonSample: String =>
assertThrows[IllegalArgumentException] {
JsonToAvroSchema.inferSchema(jsonSample, topicName)
JsonToAvroSchema.inferSchema(jsonSample, topicName, null)
}
}
}

0 comments on commit ebf9ada

Please sign in to comment.