-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
schemaRegistryAvroSerializer.ts
213 lines (190 loc) · 7.32 KB
/
schemaRegistryAvroSerializer.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
import { SchemaRegistry } from "@azure/schema-registry";
import * as avro from "avsc";
// REVIEW: This should go in to a shared doc somewhere that all of the different
// language serializer's docs can reference.
//
// Wire format
// -----------
//
// This is a standard meant to be reused across schema registry serializers for
// different format. We only have an avro serializer at this point so picking
// apart this format is inlined here, but handling could be extracted and shared
// between serializers of different formats in the future.
//
// - [4 bytes: Format Indicator]
// - Currently always zero to indicate format below.
//
// - [32 bytes: Schema ID]
// - UTF-8 hexadecimal representation of GUID.
// - 32 hex digits, no hyphens.
// - Same format and byte order as string from Schema Registry service.
// - This will soon be revised to an 8 byte long value with the format
// indicator bumped.
//
// - [Remaining bytes: Avro payload (in general, format-specific payload)]
// - Avro Binary Encoding
// - NOT Avro Object Container File, which includes the schema and defeats
// the purpose of this serialzer to move the schema out of the message
// payload and into the schema registry.
//
const FORMAT_INDICATOR = 0;
const SCHEMA_ID_OFFSET = 4;
const SCHEMA_ID_LENGTH = 32;
const PAYLOAD_OFFSET = 36;
interface CacheEntry {
/** Schema ID */
id: string;
/** avsc-specific representation for schema */
type: avro.Type;
}
/**
* Options for Schema
*/
export interface SchemaRegistryAvroSerializerOptions {
/**
* When true, register new schemas passed to serialize. Otherwise, and by
* default, fail if schema has not already been registered.
*
* Automatic schema registration is NOT recommended for production scenarios.
*/
autoRegisterSchemas?: boolean;
}
/**
* Avro serializer that obtains schemas from a schema registry and does not
* pack schemas into its payloads.
*/
export class SchemaRegistryAvroSerializer {
/**
* Creates a new serializer.
*
* @param registry Schema Registry where schemas are registered and obtained.
* Usually this is a SchemaRegistryClient instance.
*
* @param schemaGroup The schema group to use when making requests to the
* registry.
*/
constructor(
registry: SchemaRegistry,
schemaGroup: string,
options?: SchemaRegistryAvroSerializerOptions
) {
this.registry = registry;
this.schemaGroup = schemaGroup;
this.autoRegisterSchemas = options?.autoRegisterSchemas ?? false;
}
private readonly schemaGroup: string;
private readonly registry: SchemaRegistry;
private readonly autoRegisterSchemas: boolean;
// REVIEW: signature.
//
// - Better to serialize into a stream? I aborted that for now as I wanted to
// do the simplest thing that could possibly work first to make sure there
// were no blockers in our dependencies. I also wanted to get feedback on
// what the API shape should be before diving into that.
//
// - This type should ultimately be able to implement a core ObjectSerializer
// interface. Do we know what that would look like? Maybe it takes `any` as
// the format-specific schema/type info? Or does it always take a
// format-specific schema string?
//
// The C#/Java approach of passing Type and assuming every serializer can
// get its schema by reflecting on the type does not work for JavaScript. We
// need to support arbitrary objects that match a schema.
//
// Maybe each format expects a different property on this arg so that you
// could at least pass enough info for multiple formats, and then your call
// to ObjectSerializer is at least not tied to a single format?
//
// - Should we wrap all errors thrown by avsc to avoid having our exception //
// contract being tied to its implementation details?
/**
* Serializes a value into a buffer.
*
* @param value The value to serialize.
* @param schema The Avro schema to use.
* @returns A new buffer with the serialized value
*/
async serialize(value: any, schema: string): Promise<Buffer> {
const entry = await this.getSchemaByContent(schema);
const payload = entry.type.toBuffer(value);
const buffer = Buffer.alloc(PAYLOAD_OFFSET + payload.length);
buffer.writeUInt32BE(FORMAT_INDICATOR, 0);
buffer.write(entry.id, SCHEMA_ID_OFFSET, SCHEMA_ID_LENGTH, "utf-8");
payload.copy(buffer, PAYLOAD_OFFSET);
return buffer;
}
// REVIEW: signature. See serialize and s/serialize into/deserialize from/.
/**
* Deserializes a value from a buffer.
*
* @param buffer The buffer with the serialized value.
* @return The deserialized value.
*/
async deserialize<T>(buffer: Buffer): Promise<T> {
if (buffer.length < PAYLOAD_OFFSET) {
throw new RangeError("Buffer is too small to have the correct format.");
}
const format = buffer.readUInt32BE(0);
if (format !== FORMAT_INDICATOR) {
throw new TypeError(`Buffer has unknown format indicator: 0x${format.toString(16)}`);
}
const schemaIdBuffer = buffer.slice(SCHEMA_ID_OFFSET, PAYLOAD_OFFSET);
const schemaId = schemaIdBuffer.toString("utf-8");
const schema = await this.getSchemaById(schemaId);
const payloadBuffer = buffer.slice(PAYLOAD_OFFSET);
return schema.type.fromBuffer(payloadBuffer);
}
//
// Avoid hitting the schema registry on every call by caching schemas.
//
// Currently the cache can only be discarded by throwing away the serializer
// and creating a new one, but this will be revisited after the initial
// preview:
//
// https://github.com/Azure/azure-sdk-for-js/issues/10438
//
private readonly cacheByContent = new Map<string, CacheEntry>();
private readonly cacheById = new Map<string, CacheEntry>();
private async getSchemaById(schemaId: string): Promise<CacheEntry> {
const cached = this.cacheById.get(schemaId);
if (cached) {
return cached;
}
const schemaResponse = await this.registry.getSchemaById(schemaId);
if (!schemaResponse.serializationType.match(/^avro$/i)) {
throw new Error(
`Schema with ID '${schemaResponse.id}' has has serialization type '${schemaResponse.serializationType}', not 'avro'.`
);
}
const avroType = avro.Type.forSchema(JSON.parse(schemaResponse.content));
return this.cache(schemaId, schemaResponse.content, avroType);
}
private async getSchemaByContent(schema: string): Promise<CacheEntry> {
let cached = this.cacheByContent.get(schema);
if (cached) {
return cached;
}
const avroType = avro.Type.forSchema(JSON.parse(schema));
if (!avroType.name) {
throw new Error("Schema must have a name.");
}
const description = {
group: this.schemaGroup,
name: avroType.name,
serializationType: "avro",
content: schema
};
const schemaIdResponse = this.autoRegisterSchemas
? await this.registry.registerSchema(description)
: await this.registry.getSchemaId(description);
return this.cache(schemaIdResponse.id, schema, avroType);
}
private cache(id: string, schema: string, type: avro.Type): CacheEntry {
const entry = { id, type };
this.cacheByContent.set(schema, entry);
this.cacheById.set(id, entry);
return entry;
}
}