Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spring boot kafka consumer issue deserializing GSR AVRO GENERIC_RECORD #241

Closed
pc-akothapeta opened this issue Jan 5, 2023 · 3 comments
Closed
Labels
question Further information is requested

Comments

@pc-akothapeta
Copy link

pc-akothapeta commented Jan 5, 2023

I have topics written by kafka connect that are in AVRO GENERIC_RECORD format using GSR. I am able to consume those using the documentation using a java program. However I am having difficulty reading consuming them using spring boot application.

My simple config class

@EnableKafka
@Configuration

public class KafkaAvroConsumerConfig {

	@Value("${spring.kafka.bootstrap-servers}")
	private String brokers;
	@Value("${spring.kafka.consumer.group-id}")
	private String groupId;

	// Creating a Listener
	@Bean
	public ConcurrentKafkaListenerContainerFactory<GenericRecord, GenericRecord> concurrentKafkaListenerContainerFactory() {
		ConcurrentKafkaListenerContainerFactory<GenericRecord, GenericRecord> factory = new ConcurrentKafkaListenerContainerFactory<>();
		factory.setConsumerFactory(consumerFactory());
		return factory;
	}

	@Bean
	public ConsumerFactory<GenericRecord, GenericRecord> consumerFactory() {
		return new DefaultKafkaConsumerFactory<>(consumerConfigs());
	}

	@Bean
	public Map<String, Object> consumerConfigs() {
		// Creating a Map of string-object pairs
		Map<String, Object> config = new HashMap<>();

		// Adding the Configuration
		config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
		config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

		config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName());
		config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName());

		config.put(AWSSchemaRegistryConstants.AWS_REGION, region);
		config.put(AWSSchemaRegistryConstants.REGISTRY_NAME, registryName);

		config.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());
		config.put(AWSSchemaRegistryConstants.SCHEMA_NAMING_GENERATION_CLASS,
				MySchemaNamingStrategy.class.getName());

		return config;
	}
}

And listener class

@Component

public class KafkaAvroConsumer {

	@Autowired
	KafkaTemplate<GenericRecord, GenericRecord> kafkaTemplate;

	@KafkaListener(topics = "gsr1.HR.DEPARTMENTS")
	public void listenDepartment(ConsumerRecord<GenericRecord, GenericRecord> record) {

		//System.out.println("DEPARTMENTS key   schema = " + record.key().getSchema().toString());
		GenericRecord key = record.key();
		GenericRecord value = record.value();
		System.out.println("            record.key() = " + key);
		System.out.println("          record.value() = " + value);
		System.out.println("      Key  DEPARTMENT_ID = " + key.get("DEPARTMENT_ID"));
		System.out.println("         DEPARTMENT_NAME = " + (String) value.get("DEPARTMENT_NAME"));
	}

}

This gives me an error at "GenericRecord key = record.key();", looks like they didn't get deserialized to GenericRecord, instead they are just raw bytes

Caused by: java.lang.ClassCastException: class java.lang.String cannot be cast to class org.apache.avro.generic.GenericRecord (java.lang.String is in module java.base of loader 'bootstrap'; org.apache.avro.generic.GenericRecord is in unnamed module of loader 'app')

I was looking and in spring documentation the DefaultKafkaConsumerFactory method also takes key and value deserialization class also as parameters. So I tried to do this but that doesn't compile.. GlueSchemaRegistryKafkaDeserializer doesn't take type argument either

	public ConsumerFactory<GenericRecord, GenericRecord> consumerFactory() {
		Deserializer<GenericRecord> avroDeser =  new GlueSchemaRegistryKafkaDeserializer();
		avroDeser.configure(consumerConfigs(), false);
		return new DefaultKafkaConsumerFactory<>(consumerConfigs(), avroDeser, avroDeser);
	}

Any help in how to get this to work.

@blacktooth
Copy link
Contributor

blacktooth commented Jan 6, 2023

  config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName());

From the error message, it seems like the key is of String type.
Is there a reason why you are setting it to GlueSchemaRegistryKafkaDeserializer instead of a String deserializer?

@blacktooth blacktooth added the question Further information is requested label Jan 6, 2023
@pc-akothapeta
Copy link
Author

pc-akothapeta commented Jan 6, 2023

  config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName());

From the error message, it seems like the key is of String type. Is there a reason why you are setting it to GlueSchemaRegistryKafkaDeserializer instead of a String deserializer?

In my case both my key and value are AVRO messages. I can consume the messages fine with the java program and both key and value gets deserialized properly. I am not able to get it work with spring boot.

@pc-akothapeta
Copy link
Author

I figured out what the issue is. In the config class SpringBoot expects the factory bean name to be kafkaListenerContainerFactory. I named it concurrentKafkaListenerContainerFactory which is causing the issue of not loading the consumer and glue configurations properly.

By default, a bean with name kafkaListenerContainerFactory is
expected.

https://docs.spring.io/spring-kafka/docs/current/reference/html/#kafka-listener-annotation

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants