/
AWSKafkaAvroDeserializer.java
149 lines (128 loc) · 5.36 KB
/
AWSKafkaAvroDeserializer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.amazonaws.services.schemaregistry.deserializers.avro;
import com.amazonaws.services.schemaregistry.common.AWSDeserializerInput;
import com.amazonaws.services.schemaregistry.deserializers.AWSDeserializer;
import com.amazonaws.services.schemaregistry.deserializers.AWSDeserializerDataParser;
import com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException;
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Deserializer;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import java.nio.ByteBuffer;
import java.util.Map;
/**
* AWS Kafka Avro Deserializer responsible for de-serializing the data using
* Avro protocol serializer.
*/
@Slf4j
public class AWSKafkaAvroDeserializer implements Deserializer<Object> {
@Getter
private final AwsCredentialsProvider credentialProvider;
@Getter
@Setter
private AWSDeserializer awsDeserializer;
private SecondaryDeserializer secondaryDeserializer = SecondaryDeserializer.newInstance();
/**
* Constructor used by Kafka consumer.
*/
public AWSKafkaAvroDeserializer() {
this(DefaultCredentialsProvider.builder().build(), null);
}
public AWSKafkaAvroDeserializer(@NonNull Map<String, ?> configs) {
this(DefaultCredentialsProvider.builder().build(), configs);
}
/**
* Constructor accepting AWSCredentialsProvider.
*
* @param credentialProvider AWSCredentialsProvider instance.
*/
public AWSKafkaAvroDeserializer(AwsCredentialsProvider credentialProvider, Map<String, ?> configs) {
this.credentialProvider = credentialProvider;
if (configs != null) {
configure(configs, false);
}
}
/**
* Configuration method for injecting configuration properties.
*
* @param configs configuration elements for de-serializer
* @param isKey true if key, false otherwise
*/
@Override
public void configure(@NonNull Map<String, ?> configs, boolean isKey) {
log.info("Configuring Amazon Glue Schema Registry Service using these properties: {}", configs.toString());
this.awsDeserializer = AWSDeserializer.builder().credentialProvider(this.credentialProvider).configs(configs)
.build();
if (configs.containsKey(AWSSchemaRegistryConstants.SECONDARY_DESERIALIZER)) {
configureSecondaryDeser(configs, isKey);
}
}
/**
* De-serialize operation for de-serializing the byte array to an Object.
*
* @param topic Kafka topic name
* @param data serialized data to be de-serialized in byte array
* @return de-serialized object instance
*/
@Override
public Object deserialize(String topic, byte[] data) {
Object result;
if (data == null) {
return null;
}
Byte headerVersionByte = getHeaderVersionByte(data);
result = deserializeByHeaderVersionByte(topic, data, headerVersionByte);
return result;
}
/**
* Resource clean up for Closeable. This method internally shuts down the
* background thread for publishing cloud watch metrics. After this is called, a
* new instance of this class should be created to enable the metrics publishing
* feature.
*/
@Override
public void close() {
this.awsDeserializer.close();
}
private AWSDeserializerInput prepareInput(byte[] data, String topic) {
return AWSDeserializerInput.builder().buffer(ByteBuffer.wrap(data)).transportName(topic).build();
}
/**
* Configure the secondary de-serializer and validate if it's from Kafka.
*/
private void configureSecondaryDeser(Map<String, ?> configs, boolean isKey) {
if (!secondaryDeserializer.validate(configs)) {
throw new AWSSchemaRegistryException("The secondary deserializer is not from Kafka");
}
secondaryDeserializer.configure(configs, isKey);
}
/**
* De-serialize operation depend on the value of header version byte.
*/
private Object deserializeByHeaderVersionByte(String topic, byte[] data, Byte headerVersionByte) {
return headerVersionByte.equals(AWSSchemaRegistryConstants.HEADER_VERSION_BYTE)
? this.awsDeserializer.deserialize(prepareInput(data, topic))
: secondaryDeserializer.deserialize(topic, data);
}
private Byte getHeaderVersionByte(byte[] data) {
AWSDeserializerDataParser dataParser = AWSDeserializerDataParser.getInstance();
return dataParser.getHeaderVersionByte(ByteBuffer.wrap(data));
}
}