Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schema registry 3/N #1363

Merged
merged 79 commits into from Mar 14, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
79 commits
Select commit Hold shift + click to select a range
fe86173
Schema Registry proto changes
Feb 13, 2018
6db4333
Infrastructure to store schemas
Feb 13, 2018
a40e96d
A default schema registry implementation
Feb 13, 2018
c12b69c
Merge branch 'master' into schema-registry-1
Feb 13, 2018
b5fa7a6
Renumber schema fields
Feb 13, 2018
3d1b465
Merge branch 'schema-registry-1' into schema-registry-2
Feb 13, 2018
815d518
Merge branch 'schema-registry-2' into schema-registry-3
Feb 13, 2018
4525dce
Update Pulsar API with schema changes
Feb 14, 2018
685b2df
Merge branch 'schema-registry-1' into schema-registry-2
Feb 14, 2018
0895f12
Merge branch 'schema-registry-2' into schema-registry-3
Feb 14, 2018
33a4363
Revert field number change
Feb 14, 2018
46cb957
Merge branch 'master' into schema-registry-1
Feb 23, 2018
e5e650b
Merge branch 'schema-registry-1' into schema-registry-2
Feb 23, 2018
9268478
Merge branch 'schema-registry-2' into schema-registry-3
Feb 23, 2018
74aa08c
Fix merge conflict
Feb 23, 2018
cd2ec8d
Merge branch 'schema-registry-2' into schema-registry-3
Feb 23, 2018
f9bd5e5
Merge branch 'master' into schema-registry-1
Feb 26, 2018
61897b3
Merge branch 'schema-registry-1' into schema-registry-2
Feb 26, 2018
3383efd
Merge branch 'schema-registry-2' into schema-registry-3
Feb 26, 2018
f20b920
Merge branch 'master' into schema-registry-1
Feb 28, 2018
184beb9
Merge branch 'schema-registry-1' into schema-registry-2
Feb 28, 2018
6530f44
Merge branch 'schema-registry-2' into schema-registry-3
Feb 28, 2018
64df492
Fix broken merge
Feb 28, 2018
2152a9f
Merge branch 'schema-registry-2' into schema-registry-3
Feb 28, 2018
0405750
Merge branch 'master' into schema-registry-1
Mar 1, 2018
c0d68e5
Address issues in review
Mar 1, 2018
a2de9ad
Merge branch 'schema-registry-1' into schema-registry-2
Mar 1, 2018
d68696d
Merge branch 'schema-registry-2' into schema-registry-3
Mar 1, 2018
d9147a6
Add schema type back to proto definition
Mar 1, 2018
9b11edf
Merge branch 'schema-registry-1' into schema-registry-2
Mar 1, 2018
3e8e31e
Address comments regarding lombok usage
Mar 1, 2018
cf8dd9a
Remove reserved future enum fields
Mar 1, 2018
077ec3c
regenerate code from protobuf
Mar 1, 2018
d2010e2
Merge branch 'schema-registry-1' into schema-registry-2
Mar 1, 2018
a2b09b4
Remove unused code
Mar 1, 2018
e7e72f4
Add schema version to producer success message
Mar 1, 2018
9753933
Merge branch 'schema-registry-1' into schema-registry-2
Mar 1, 2018
4b788ea
plumb schema through to producer
Mar 1, 2018
7b902f6
Revert "Add schema version to producer success message"
Mar 1, 2018
755ac8c
Merge branch 'schema-registry-1' into schema-registry-2
Mar 1, 2018
dda21bb
Revert "Revert "Add schema version to producer success message""
Mar 1, 2018
14f18bb
Merge branch 'master' into schema-registry-1
Mar 1, 2018
efb09de
Merge branch 'schema-registry-1' into schema-registry-2
Mar 1, 2018
d29d87c
Persist schema on producer connect
Mar 1, 2018
a13f1fc
Merge branch 'schema-registry-2' into schema-registry-3
Mar 1, 2018
fdec73a
Merge branch 'master' into schema-registry-1
Mar 2, 2018
23f8475
Merge branch 'schema-registry-1' into schema-registry-2
Mar 2, 2018
1b10dd2
Add principal to schema on publish
Mar 2, 2018
92acf7b
Merge branch 'schema-registry-2' into schema-registry-3
Mar 2, 2018
3145bbe
Reformat function for readability
Mar 2, 2018
6026e52
Remove unused protoc profile
Mar 2, 2018
661c757
Rename put on schema registry to putIfAbsent
Mar 2, 2018
2706f23
fix lombok tomfoolery on builder
Mar 2, 2018
da59c5b
Reformat function for readability
Mar 2, 2018
96562d7
Remove unused protoc profile
Mar 2, 2018
3243249
Rename put on schema registry to putIfAbsent
Mar 2, 2018
9484f2b
fix compile errors from parent branch changes
Mar 2, 2018
2b6a179
plumb hash through and allow lookup by data
Mar 2, 2018
7d4089d
wip
Mar 2, 2018
989441f
run tests
Mar 2, 2018
03da686
wip: address review comments
Mar 2, 2018
b578ff9
switch underscore to slash in schema name
Mar 2, 2018
b9a2596
blah
Mar 2, 2018
de01cb0
Merge remote-tracking branch 'origin/master' into schema-registry-2
Mar 5, 2018
9103f52
Merge branch 'schema-registry-2' into schema-registry-3
Mar 5, 2018
9800acf
Get duplicate schema detection to work
Mar 5, 2018
9133378
Merge branch 'master' into schema-registry-2
Mar 5, 2018
9947dd6
Fix protobuf version incompatibility
Mar 5, 2018
a8765e0
Merge branch 'master' into schema-registry-2
Mar 5, 2018
3edaa1f
Merge branch 'schema-registry-2' into schema-registry-3
Mar 5, 2018
3d489a3
Merge branch 'master' into schema-registry-2
Mar 6, 2018
b36a016
Merge branch 'schema-registry-2' into schema-registry-3
Mar 6, 2018
d756cb0
Merge branch 'master' into schema-registry-3
Mar 9, 2018
daf1161
fix merge issues
Mar 9, 2018
fcbbb21
Merge branch 'master' into schema-registry-3
Mar 12, 2018
c74bbcd
Fix license headers
Mar 12, 2018
2d4b75a
Address review
Mar 12, 2018
517bdff
Merge branch 'master' into schema-registry-3
Mar 13, 2018
5a1d53d
Merge branch 'master' into schema-registry-3
Mar 13, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view

Large diffs are not rendered by default.

@@ -0,0 +1,33 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.schema;

import javax.validation.constraints.NotNull;
import org.apache.pulsar.broker.PulsarService;

@SuppressWarnings("unused")
public class BookkeeperSchemaStorageFactory implements SchemaStorageFactory {
@Override
@NotNull
public SchemaStorage create(PulsarService pulsar) throws Exception {
BookkeeperSchemaStorage service = new BookkeeperSchemaStorage(pulsar);
service.init();
return service;
}
}
@@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.schema;

import com.google.common.base.MoreObjects;
import java.nio.ByteBuffer;
import java.util.Objects;
import org.apache.pulsar.common.schema.SchemaVersion;

class LongSchemaVersion implements SchemaVersion {
private final long version;

LongSchemaVersion(long version) {
this.version = version;
}

public long getVersion() {
return version;
}

@Override
public byte[] bytes() {
ByteBuffer buffer = ByteBuffer.allocate(Long.SIZE);
buffer.putLong(version);
buffer.rewind();
return buffer.array();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
LongSchemaVersion that = (LongSchemaVersion) o;
return version == that.version;
}

@Override
public int hashCode() {

return Objects.hash(version);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("version", version)
.toString();
}
}
Expand Up @@ -23,10 +23,13 @@
import static org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.Functions.toPairs;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -38,6 +41,7 @@
import org.apache.pulsar.common.schema.SchemaVersion;

public class SchemaRegistryServiceImpl implements SchemaRegistryService {
private static HashFunction hashFunction = Hashing.sha256();
private final SchemaStorage schemaStorage;
private final Clock clock;

Expand Down Expand Up @@ -76,6 +80,7 @@ public CompletableFuture<SchemaAndMetadata> getSchema(String schemaId, SchemaVer
@Override
@NotNull
public CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId, SchemaData schema) {
byte[] context = hashFunction.hashBytes(schema.getData()).asBytes();
SchemaRegistryFormat.SchemaInfo info = SchemaRegistryFormat.SchemaInfo.newBuilder()
.setType(Functions.convertFromDomainType(schema.getType()))
.setSchema(ByteString.copyFrom(schema.getData()))
Expand All @@ -85,14 +90,14 @@ public CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId, Schem
.setTimestamp(clock.millis())
.addAllProps(toPairs(schema.getProps()))
.build();
return schemaStorage.put(schemaId, info.toByteArray());
return schemaStorage.put(schemaId, info.toByteArray(), context);
}

@Override
@NotNull
public CompletableFuture<SchemaVersion> deleteSchema(String schemaId, String user) {
byte[] deletedEntry = deleted(schemaId, user).toByteArray();
return schemaStorage.put(schemaId, deletedEntry);
return schemaStorage.put(schemaId, deletedEntry, new byte[]{});
}

@Override
Expand Down Expand Up @@ -156,6 +161,9 @@ static Map<String, String> toMap(List<SchemaRegistryFormat.SchemaInfo.KeyValuePa
}

static List<SchemaRegistryFormat.SchemaInfo.KeyValuePair> toPairs(Map<String, String> map) {
if (isNull(map)) {
return Collections.emptyList();
}
List<SchemaRegistryFormat.SchemaInfo.KeyValuePair> pairs = new ArrayList<>(map.size());
for (Map.Entry<String, String> entry : map.entrySet()) {
SchemaRegistryFormat.SchemaInfo.KeyValuePair.Builder builder =
Expand Down
Expand Up @@ -23,7 +23,7 @@

public interface SchemaStorage {

CompletableFuture<SchemaVersion> put(String key, byte[] value);
CompletableFuture<SchemaVersion> put(String key, byte[] value, byte[] hash);

CompletableFuture<StoredSchema> get(String key, SchemaVersion version);

Expand Down