Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Low performance while using latest as schema.id #105

Closed
agolovenko opened this issue Apr 8, 2020 · 15 comments
Closed

Low performance while using latest as schema.id #105

agolovenko opened this issue Apr 8, 2020 · 15 comments

Comments

@agolovenko
Copy link

Looks like SchemaLoader uses unchached call to get the latest version id:

https://github.com/AbsaOSS/ABRiS/blob/master/src/main/scala/za/co/absa/abris/avro/schemas/SchemaLoader.scala#L103-L110

This happens quite often and results in a huge amount of http requests to schema registry. This value could be cached for some time period, and the time period should be configurable.

@felipemmelo
Copy link
Collaborator

@agolovenko , which version are you using? In the past it used to be the case, but in the newest version, 3.1.1, it should be invoked only once by executor.

@agolovenko
Copy link
Author

It is 3.1.1 with spark 2.4.5 and scala 2.11.12.

@agolovenko
Copy link
Author

It is strange: this is supposed to be cached, but is not: https://github.com/AbsaOSS/ABRiS/blob/master/src/main/scala/za/co/absa/abris/avro/sql/AvroDataToCatalyst.scala#L51-L55

Here's a a typical trace:

"stream execution thread for [id = 098b31cd-b8e8-46fe-995b-164a1f660178, runId = 997b4487-20a4-40a0-9a6f-2a806e600618]@7827" daemon prio=5 tid=0x3a nid=NA runnable
  java.lang.Thread.State: RUNNABLE
	  at za.co.absa.abris.avro.read.confluent.SchemaManager$.getLatestVersionId(SchemaManager.scala:147)
	  at za.co.absa.abris.avro.schemas.SchemaLoader$.getSchemaId(SchemaLoader.scala:105)
	  at za.co.absa.abris.avro.schemas.SchemaLoader$.loadFromSchemaRegistry(SchemaLoader.scala:80)
	  at za.co.absa.abris.avro.schemas.SchemaLoader$.loadFromSchemaRegistryValue(SchemaLoader.scala:49)
	  at za.co.absa.abris.avro.parsing.utils.AvroSchemaUtils$.loadForValue(AvroSchemaUtils.scala:53)
	  at za.co.absa.abris.avro.sql.AvroDataToCatalyst.loadSchemaFromRegistry(AvroDataToCatalyst.scala:130)
	  at za.co.absa.abris.avro.sql.AvroDataToCatalyst.avroSchema$lzycompute(AvroDataToCatalyst.scala:53)
	  - locked <0x2b88> (a za.co.absa.abris.avro.sql.AvroDataToCatalyst)
	  at za.co.absa.abris.avro.sql.AvroDataToCatalyst.avroSchema(AvroDataToCatalyst.scala:51)
	  at za.co.absa.abris.avro.sql.AvroDataToCatalyst.dataType$lzycompute(AvroDataToCatalyst.scala:47)
	  at za.co.absa.abris.avro.sql.AvroDataToCatalyst.dataType(AvroDataToCatalyst.scala:47)
	  at org.apache.spark.sql.catalyst.expressions.Alias.toAttribute(namedExpressions.scala:176)
	  at org.apache.spark.sql.catalyst.optimizer.CollapseProject$$anonfun$collectAliases$1.applyOrElse(Optimizer.scala:670)
	  at org.apache.spark.sql.catalyst.optimizer.CollapseProject$$anonfun$collectAliases$1.applyOrElse(Optimizer.scala:669)
	  at scala.PartialFunction$$anonfun$runWith$1.apply(PartialFunction.scala:141)
	  at scala.PartialFunction$$anonfun$runWith$1.apply(PartialFunction.scala:140)
	  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	  at scala.collection.TraversableLike$class.collect(TraversableLike.scala:271)
	  at scala.collection.AbstractTraversable.collect(Traversable.scala:104)
	  at org.apache.spark.sql.catalyst.optimizer.CollapseProject$.collectAliases(Optimizer.scala:669)
	  at org.apache.spark.sql.catalyst.optimizer.CollapseProject$.org$apache$spark$sql$catalyst$optimizer$CollapseProject$$haveCommonNonDeterministicOutput(Optimizer.scala:678)
	  at org.apache.spark.sql.catalyst.optimizer.CollapseProject$$anonfun$apply$9.applyOrElse(Optimizer.scala:654)
	  at org.apache.spark.sql.catalyst.optimizer.CollapseProject$$anonfun$apply$9.applyOrElse(Optimizer.scala:652)
	  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$2.apply(TreeNode.scala:284)
	  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$2.apply(TreeNode.scala:284)
	  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
	  at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:283)
	  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformUp(LogicalPlan.scala:29)
	  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.transformUp(AnalysisHelper.scala:158)
	  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUp(LogicalPlan.scala:29)
	  at org.apache.spark.sql.catalyst.optimizer.CollapseProject$.apply(Optimizer.scala:652)
	  at org.apache.spark.sql.catalyst.optimizer.CollapseProject$.apply(Optimizer.scala:650)
	  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:87)
	  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:84)
	  at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
	  at scala.collection.immutable.List.foldLeft(List.scala:84)
	  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:84)
	  at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:76)
	  at scala.collection.immutable.List.foreach(List.scala:392)
	  at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:76)
	  at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan$lzycompute(IncrementalExecution.scala:77)
	  - locked <0x2bea> (a org.apache.spark.sql.execution.streaming.IncrementalExecution)
	  at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan(IncrementalExecution.scala:77)
	  at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:73)
	  at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:69)
	  at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:78)
	  at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:78)
	  at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$4.apply(MicroBatchExecution.scala:528)
	  at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$4.apply(MicroBatchExecution.scala:519)
	  at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
	  at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
	  at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:519)
	  at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
	  at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
	  at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
	  at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
	  at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
	  at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
	  at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
	  at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
	  at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
	  at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)

@felipemmelo
Copy link
Collaborator

maybe issues accessing Schema Registry?

Also, are you using 3.1.1?

@agolovenko
Copy link
Author

As I already mentioned I use: abris 3.1.1 with spark 2.4.5 and scala 2.11.12.
No problems with schema registry: in fact setting the concrete schema id vs latest makes things run 5x faster.

@agolovenko
Copy link
Author

tried with spark 2.4.4 - no difference.
Here are some logs:

DEBUG	2020-04-08 19:31:03,902	11721	za.co.absa.abris.avro.read.confluent.SchemaManager	[Executor task launch worker for task 13]	Trying to get schema for id '100009'
DEBUG	2020-04-08 19:31:03,902	11721	za.co.absa.abris.avro.read.confluent.SchemaManager	[Executor task launch worker for task 15]	Trying to get schema for id '100009'
DEBUG	2020-04-08 19:31:03,902	11721	za.co.absa.abris.avro.read.confluent.SchemaManager	[Executor task launch worker for task 13]	Subject name resolved to: input2-value
DEBUG	2020-04-08 19:31:03,902	11721	za.co.absa.abris.avro.read.confluent.SchemaManager	[Executor task launch worker for task 15]	Subject name resolved to: input2-value
DEBUG	2020-04-08 19:31:03,902	11721	za.co.absa.abris.avro.read.confluent.SchemaManager	[Executor task launch worker for task 15]	Trying to get latest schema version id for subject 'input2-value'
DEBUG	2020-04-08 19:31:03,902	11721	za.co.absa.abris.avro.read.confluent.SchemaManager	[Executor task launch worker for task 13]	Trying to get latest schema version id for subject 'input2-value'
DEBUG	2020-04-08 19:31:03,902	11721	io.confluent.kafka.schemaregistry.client.rest.RestService	[Executor task launch worker for task 15]	Sending GET with input null to https://psrc-4kk0p.westeurope.azure.confluent.cloud/subjects/input2-value/versions/latest
DEBUG	2020-04-08 19:31:03,902	11721	za.co.absa.abris.avro.read.confluent.SchemaManager	[Executor task launch worker for task 14]	Trying to get schema for id '100009'
DEBUG	2020-04-08 19:31:03,994	11813	za.co.absa.abris.avro.read.confluent.SchemaManager	[Executor task launch worker for task 15]	Trying to get schema for subject 'input2-value' and id '100009'
DEBUG	2020-04-08 19:31:03,994	11813	za.co.absa.abris.avro.read.confluent.SchemaManager	[Executor task launch worker for task 14]	Subject name resolved to: input2-value
DEBUG	2020-04-08 19:31:03,994	11813	za.co.absa.abris.avro.read.confluent.SchemaManager	[Executor task launch worker for task 14]	Trying to get latest schema version id for subject 'input2-value'
DEBUG	2020-04-08 19:31:03,994	11813	io.confluent.kafka.schemaregistry.client.rest.RestService	[Executor task launch worker for task 13]	Sending GET with input null to https://psrc-4kk0p.westeurope.azure.confluent.cloud/subjects/input2-value/versions/latest
DEBUG	2020-04-08 19:31:04,082	11901	za.co.absa.abris.avro.read.confluent.SchemaManager	[Executor task launch worker for task 13]	Trying to get schema for subject 'input2-value' and id '100009'
DEBUG	2020-04-08 19:31:04,082	11901	io.confluent.kafka.schemaregistry.client.rest.RestService	[Executor task launch worker for task 14]	Sending GET with input null to https://psrc-4kk0p.westeurope.azure.confluent.cloud/subjects/input2-value/versions/latest
DEBUG	2020-04-08 19:31:04,171	11990	za.co.absa.abris.avro.read.confluent.SchemaManager	[Executor task launch worker for task 14]	Trying to get schema for subject 'input2-value' and id '100009'
DEBUG	2020-04-08 19:31:04,192	12011	org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection	[Executor task launch worker for task 14]	code for createexternalrow(if (isnull(input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true])) null else createexternalrow(if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].DataType.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].EventId.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Country.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].EventType.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].EventTimestamp.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].EventVersion.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].isNullAt) null else if (isnull(input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event)) null else createexternalrow(if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.ConfirmUrl.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.ClientId.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.Language.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.ClientCountryCode.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.Street.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.Number.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.Door.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.PostCode.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.City.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.UserId, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.SalesforceId.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.UserName.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.Gender.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.FirstName.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.Surname.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.BirthDate.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.Email.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.PhoneNumber.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.CountryCode.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.RegistrationDate.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.OccurredOn.toString, if (input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.isNullAt) null else input[0, struct<DataType:string,EventId:string,Country:string,EventType:string,EventTimestamp:string,EventVersion:string,Event:struct<ConfirmUrl:string,ClientId:string,Language:string,ClientCountryCode:string,Street:string,Number:string,Door:string,PostCode:string,City:string,UserId:bigint,SalesforceId:string,UserName:string,Gender:string,FirstName:string,Surname:string,BirthDate:string,Email:string,PhoneNumber:string,CountryCode:string,RegistrationDate:string,OccurredOn:string,Version:string>>, true].Event.Version.toString, StructField(ConfirmUrl,StringType,false), StructField(ClientId,StringType,false), ... 20 more fields), StructField(DataType,StringType,true), StructField(EventId,StringType,true), StructField(Country,StringType,true), StructField(EventType,StringType,true), StructField(EventTimestamp,StringType,true), StructField(EventVersion,StringType,true), StructField(Event,StructType(StructField(ConfirmUrl,StringType,false), StructField(ClientId,StringType,false), StructField(Language,StringType,false), StructField(ClientCountryCode,StringType,false), StructField(Street,StringType,true), StructField(Number,StringType,true), StructField(Door,StringType,true), StructField(PostCode,StringType,true), StructField(City,StringType,true), StructField(UserId,LongType,false), StructField(SalesforceId,StringType,false), StructField(UserName,StringType,false), StructField(Gender,StringType,true), StructField(FirstName,StringType,true), StructField(Surname,StringType,true), StructField(BirthDate,StringType,false), StructField(Email,StringType,false), StructField(PhoneNumber,StringType,true), StructField(CountryCode,StringType,false), StructField(RegistrationDate,StringType,false), StructField(OccurredOn,StringType,false), StructField(Version,StringType,true)),true)), StructField(value,StructType(StructField(DataType,StringType,true), StructField(EventId,StringType,true), StructField(Country,StringType,true), StructField(EventType,StringType,true), StructField(EventTimestamp,StringType,true), StructField(EventVersion,StringType,true), StructField(Event,StructType(StructField(ConfirmUrl,StringType,false), StructField(ClientId,StringType,false), StructField(Language,StringType,false), StructField(ClientCountryCode,StringType,false), StructField(Street,StringType,true), StructField(Number,StringType,true), StructField(Door,StringType,true), StructField(PostCode,StringType,true), StructField(City,StringType,true), StructField(UserId,LongType,false), StructField(SalesforceId,StringType,false), StructField(UserName,StringType,false), StructField(Gender,StringType,true), StructField(FirstName,StringType,true), StructField(Surname,StringType,true), StructField(BirthDate,StringType,false), StructField(Email,StringType,false), StructField(PhoneNumber,StringType,true), StructField(CountryCode,StringType,false), StructField(RegistrationDate,StringType,false), StructField(OccurredOn,StringType,false), StructField(Version,StringType,true)),true)),true)):

@agolovenko
Copy link
Author

Just as an idea: could it all be for the reason that my app is in fact structured streaming app? Seems like AvroDataToCatalyst gets recreated for each batch. If so, is there a way around that?

  val schemaRegistryConfig = Map(
    SchemaManager.PARAM_SCHEMA_REGISTRY_URL          -> "https://psrc-4kk0p.westeurope.azure.confluent.cloud",
    SchemaManager.PARAM_SCHEMA_REGISTRY_TOPIC        -> "input2",
    SchemaManager.PARAM_VALUE_SCHEMA_NAMING_STRATEGY -> SchemaManager.SchemaStorageNamingStrategies.TOPIC_NAME,
    SchemaManager.PARAM_VALUE_SCHEMA_ID              -> "latest", //"100009", 
    "basic.auth.credentials.source"                  -> "USER_INFO",
    "schema.registry.basic.auth.user.info"           -> "...",
    "auto.register.schemas"                          -> "false"
  )

    val upstream = consumeEventHub() // creates an upstream of messages

    upstream
      .select(from_confluent_avro(col("body"), schemaRegistryConfig) as "value")
      .writeStream
      .option("checkpointLocation", "checkpoints")
      .format("console")
      .start()
      .awaitTermination()

@felipemmelo
Copy link
Collaborator

Hi @agolovenko , sorry, was on holidays. It shouldn't be, since the library was developed specifically for the structured API. I'll try to replicate your issue and come back to you asap.

@algorri94
Copy link
Contributor

Hi, I can confirm this is happening to us as well. We are receiving many calls in the Schema Registry API trying to get the latest version.
15:06:29 INFO: i.c.r.requests | 127.0.0.1 - - [13/Apr/2020:13:06:29 +0000] “GET /subjects/subject-value/versions/latest HTTP/1.1” 200 3611 2
It seems it's only happening in the from_confluent_avro function. Apparently, it's working as expected when using the to_confluent_avro function.

@felipemmelo
Copy link
Collaborator

Hi @agolovenko and @algorri94 , once again, tks a lot for the help.

We seem to have 2 situations here:

  1. Retrieval of Avro reader schema for every micro-batch
  2. Retrieval of Avro writer schema for every record

The former is definitely a "performance bug" and is something we can quickly address by retrieving the schema before the Catalyst expression is invoked, which would achieve the same performance as when the schema is informed as a plain JSON file.

The latter is a bit more involved. As you certainly know, Avro uses writer and reader schemas to provide evolution capabilities. We cannot assume a single writer schema for the whole execution for compatibility reasons, so we rely on CachedSchemaRegistryClient to cache it, which should be doing its job locally, i.e. should not reach Schema Registry back-end for a cached id as we can see here.

@cerveada is currently off but I'll ask him for a chat as soon as he's back for us to decide how to better address these questions. In the meantime, if you have ideas or would like to give a PR a try, please, feel free.

Cheers.

@agolovenko
Copy link
Author

agolovenko commented Apr 20, 2020

Thanks @felipemmelo ! Here's my comment

we rely on CachedSchemaRegistryClient to cache it, which should be doing its job locally

not all the calls of CachedSchemaRegistryClient are cached in reality, you can browse the source code when in doubt. getLatestSchema is one of non-cached calls and this behavior makes a lot of sense for a client that is asked "what is the latest schema RN?".

The problem is that this isn't that this model is the best for this library. You probably what to cache the result of this call, but also not forever but some period of time...

@felipemmelo
Copy link
Collaborator

Hi @agolovenko , my comments on your comments.

  1. The method you're referring to is getLatestSchemaMetadata, right? This one

  2. If yes, then of course it has to constantly query Schema Registry for the latest version, however, this only happens when getting the schema to be used by Catalyst, as you can browse here to confirm

  3. The subsequent calls can and must be cached since they are based on the id available on top of the payload for each record, as you can see here

Anyway, thank you very much for coming back and we'll soon release an improvement for this.

@cerveada
Copy link
Collaborator

cerveada commented May 27, 2020

Hello in new version (2.3.0 3.2.0) this should be fixed. In case of AvroDataToCatalyst :

  • Reader schema will be loaded only once in driver before even creating the Spark expression.
  • Writer schema is still loaded inside the expression, but it should be cached, since the id in avro payload is integer it shouldn't cause the problems with latest.

Could you test the new version and let us know if it works?

@agolovenko
Copy link
Author

Thanks guys! Great job!

@cerveada
Copy link
Collaborator

cerveada commented Jun 1, 2020

You are welcome. Since there seems to be no issue any more, I'm closing this ticket. Please open a new one if you have any problems.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants