Skip to content

Commit

Permalink
OGM-1483 Define custom Infinispan Remote Protobuf schema
Browse files Browse the repository at this point in the history
  • Loading branch information
fax4ever authored and DavideD committed Jun 6, 2018
1 parent 153e2bd commit 6be4c28
Show file tree
Hide file tree
Showing 27 changed files with 655 additions and 23 deletions.
24 changes: 24 additions & 0 deletions core/src/main/java/org/hibernate/ogm/util/impl/ResourceHelper.java
@@ -0,0 +1,24 @@
/*
* Hibernate OGM, Domain model persistence for NoSQL datastores
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.ogm.util.impl;

import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.Scanner;

public class ResourceHelper {

private static final String LINE_DELIMITER = "\\A";

public static String readResource(URL resource) throws IOException {
try ( InputStream is = resource.openStream() ) {
Scanner s = new Scanner( is ).useDelimiter( LINE_DELIMITER );
return s.hasNext() ? s.next() : "";
}
}
}
Expand Up @@ -2406,9 +2406,16 @@ Defines the package name of the generated Protobuf schema. Defaults to `Hibernat
Useful to isolate different applications using the same Infinispan Server instance.

hibernate.ogm.infinispan_remote.schema_file_name::
Defines the file name of the generated Protobuf schema. Defualts to 'Hibernate_OGM_Generated_schema.proto'.
Defines the file name of the generated Protobuf schema. Defaults to 'Hibernate_OGM_Generated_schema.proto'.
The file name must have a valid __*.proto__ extension.

hibernate.ogm.infinispan_remote.schema_override_resource::
It is possible to override the generated Protobuf schema, providing a user defined Protobuf schema resource.
Property value is of string type and it can either represent a *class path element*, an *URL* or a *file system path*.
Hibernate OGM will use the specified Protobuf Schema instead of the generated one.
This doesn't affect how entities are encoded so the specified schema is expected to be compatible with the generated one.
This can be used to defined server side indexes on caches used by JPQL and native Ickle queries.

==== Data encoding: Protobuf Schema

Using the _Infinispan Remote_ backend your data will be encoded using Protocol Buffers,
Expand Down Expand Up @@ -2458,7 +2465,7 @@ using Hibernate OGM.

The deployed schemas can be fetched from the Infinispan Server; Hibernate OGM also
logs the generated schemas at `INFO` level in the logging category
`org.hibernate.ogm.datastore.infinispanremote.impl.protobuf.SchemaDefinitions`.
`org.hibernate.ogm.datastore.infinispanremote.impl.protobuf.schema`.

[[storage-principles-of-infinispan-dataprovider]]

Expand Down
Expand Up @@ -65,6 +65,12 @@ public final class InfinispanRemoteProperties implements KeyValueStoreProperties
*/
public static final String SCHEMA_OVERRIDE_SERVICE = "hibernate.ogm.infinispan_remote.schema_override_service";

/**
* You can provide a Protobuf schema in a resource file to override the Protobuf schema being generated.
* This will not affect how entities are encoded, so the alternative schema must be compatible.
*/
public static final String SCHEMA_OVERRIDE_RESOURCE = "hibernate.ogm.infinispan_remote.schema_override_resource";

/**
* The configuration property key to configure the package name to be used in Protobuf generated schemas.
*/
Expand Down
Expand Up @@ -100,6 +100,8 @@ public class InfinispanRemoteConfiguration {

private SchemaOverride schemaOverrideService;

private URL schemaOverrideResource;

private String schemaPackageName;

private String schemaFileName;
Expand Down Expand Up @@ -137,6 +139,16 @@ public SchemaOverride getSchemaOverrideService() {
return schemaOverrideService;
}

/**
* The location of custom Protobuf schema file.
*
* @see InfinispanRemoteProperties#SCHEMA_OVERRIDE_RESOURCE
* @return might be the name of the file (too look it up in the class path) or an URL to a file.
*/
public URL getSchemaOverrideResource() {
return schemaOverrideResource;
}

public String getSchemaPackageName() {
return schemaPackageName;
}
Expand Down Expand Up @@ -180,6 +192,10 @@ public void initConfiguration(Map<?, ?> configurationMap, ServiceRegistryImpleme
.instantiate()
.getValue();

this.schemaOverrideResource = propertyReader
.property( InfinispanRemoteProperties.SCHEMA_OVERRIDE_RESOURCE, URL.class )
.getValue();

this.schemaPackageName = propertyReader
.property( InfinispanRemoteProperties.SCHEMA_PACKAGE_NAME, String.class )
.withDefault( InfinispanRemoteProperties.DEFAULT_SCHEMA_PACKAGE_NAME )
Expand Down
Expand Up @@ -8,6 +8,7 @@

import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.net.URL;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
Expand All @@ -17,7 +18,7 @@
import org.hibernate.ogm.datastore.infinispanremote.impl.cachehandler.HotRodCacheHandler;
import org.hibernate.ogm.datastore.infinispanremote.impl.cachehandler.HotRodCacheCreationHandler;
import org.hibernate.ogm.datastore.infinispanremote.impl.cachehandler.HotRodCacheValidationHandler;
import org.hibernate.ogm.datastore.infinispanremote.impl.protobuf.SchemaDefinitions;
import org.hibernate.ogm.datastore.infinispanremote.impl.protobuf.schema.SchemaDefinitions;
import org.hibernate.ogm.datastore.infinispanremote.impl.protostream.OgmProtoStreamMarshaller;
import org.hibernate.ogm.datastore.infinispanremote.impl.protostream.ProtoDataMapper;
import org.hibernate.ogm.datastore.infinispanremote.impl.protostream.ProtostreamSerializerSetup;
Expand Down Expand Up @@ -84,6 +85,9 @@ public class InfinispanRemoteDatastoreProvider extends BaseDatastoreProvider
@EffectivelyFinal
private SchemaOverride schemaOverrideService;

@EffectivelyFinal
private URL schemaOverrideResource;

//For each cache we have a schema and a set of encoders/decoders to the generated protobuf schema
@EffectivelyFinal
private Map<String,ProtoDataMapper> perCacheSchemaMappers;
Expand Down Expand Up @@ -126,6 +130,7 @@ public void configure(Map configurationValues) {
this.config.initConfiguration( configurationValues, serviceRegistry );
this.schemaCapture = config.getSchemaCaptureService();
this.schemaOverrideService = config.getSchemaOverrideService();
this.schemaOverrideResource = config.getSchemaOverrideResource();
this.schemaPackageName = config.getSchemaPackageName();
this.schemaFileName = config.getSchemaFileName();
this.createCachesEnabled = config.isCreateCachesEnabled();
Expand All @@ -146,7 +151,7 @@ public void registerSchemaDefinitions(SchemaDefinitions sd) {
this.sd = sd;
this.sd.validateSchema();
RemoteCache<String, String> protobufCache = getProtobufCache();
this.sd.deploySchema( schemaFileName, protobufCache, schemaCapture, schemaOverrideService );
this.sd.deploySchema( schemaFileName, protobufCache, schemaCapture, schemaOverrideService, schemaOverrideResource );

// register proto schema also to global serialization context used for unmarshalling
registerProtoFiles( marshaller, sd );
Expand Down Expand Up @@ -201,10 +206,18 @@ public Class<? extends QueryParserService> getDefaultQueryParserServiceType() {
return InfinispanRemoteBasedQueryParserService.class;
}

public URL getSchemaOverrideResource() {
return schemaOverrideResource;
}

public String getProtobufPackageName() {
return schemaPackageName;
}

public String getSchemaFileName() {
return schemaFileName;
}

public String getConfiguration(String cacheName) {
return cacheHandler.getConfiguration( cacheName );
}
Expand Down
Expand Up @@ -14,7 +14,7 @@
import org.hibernate.mapping.Column;
import org.hibernate.mapping.Table;
import org.hibernate.mapping.Value;
import org.hibernate.ogm.datastore.infinispanremote.impl.protobuf.SchemaDefinitions;
import org.hibernate.ogm.datastore.infinispanremote.impl.protobuf.schema.SchemaDefinitions;
import org.hibernate.ogm.datastore.infinispanremote.impl.schema.TableDefinition;
import org.hibernate.ogm.datastore.infinispanremote.options.cache.CacheConfiguration;
import org.hibernate.ogm.datastore.infinispanremote.options.cache.impl.CacheConfigurationOption;
Expand Down
Expand Up @@ -11,6 +11,7 @@
import java.util.Map;

import org.hibernate.ogm.datastore.infinispanremote.impl.VersionedAssociation;
import org.hibernate.ogm.datastore.infinispanremote.impl.protobuf.schema.SchemaDefinitions;
import org.hibernate.ogm.datastore.infinispanremote.impl.protostream.MainOgmCoDec;
import org.hibernate.ogm.datastore.infinispanremote.impl.protostream.ProtostreamAssociationPayload;
import org.hibernate.ogm.datastore.infinispanremote.impl.protostream.ProtostreamId;
Expand Down
Expand Up @@ -19,7 +19,7 @@ public LongProtofieldAccessor(final int tag, final String name, final boolean nu
}

@Override
protected String getProtobufTypeName() {
public String getProtobufTypeName() {
return "int64";
}

Expand Down
Expand Up @@ -48,6 +48,9 @@
import org.hibernate.ogm.type.spi.GridType;
import org.hibernate.type.Type;

import org.infinispan.protostream.descriptors.Descriptor;
import org.infinispan.protostream.descriptors.FieldDescriptor;

public class ProtofieldAccessorSet {

//Counter to assign unique Tag ids in protobuf. First to be assigned is '1'.
Expand Down Expand Up @@ -184,7 +187,7 @@ public void forEachProtostreamMappedField(Consumer<ProtostreamMappedField> actio
orderedFields.forEach( action );
}

private void add(ProtofieldAccessor unsafeWriter) {
private void add(BaseProtofieldAccessor unsafeWriter) {
UnsafeProtofield wrapped = new UnsafeProtofield( unsafeWriter );
UnsafeProtofield previous = fieldsPerORMName.put( unsafeWriter.getColumnName(), wrapped );
if ( previous != null ) {
Expand Down Expand Up @@ -218,4 +221,23 @@ public String[] getColumnNames() {
return fieldsPerORMName.keySet().toArray( new String[0] );
}

public boolean isDescribedIn(Descriptor descriptor) {
for ( UnsafeProtofield<?> field : orderedFields ) {
// if any field is not present the field set is not described
if ( !fieldIsDescribedIn( field, descriptor ) ) {
return false;
}
}
return true;
}

private boolean fieldIsDescribedIn(UnsafeProtofield<?> field, Descriptor descriptor) {
for ( FieldDescriptor fieldDescriptor : descriptor.getFields() ) {
// find at least one match
if ( field.isDescribedIn( fieldDescriptor ) ) {
return true;
}
}
return false;
}
}
Expand Up @@ -19,7 +19,7 @@ public StringProtofieldAccessor(final int tag, String name, boolean nullable, fi
}

@Override
protected String getProtobufTypeName() {
public String getProtobufTypeName() {
return "string";
}

Expand Down
Expand Up @@ -12,18 +12,20 @@
import org.hibernate.ogm.datastore.infinispanremote.impl.protostream.ProtostreamMappedField;
import org.hibernate.ogm.datastore.infinispanremote.impl.schema.ProtobufFieldExporter;
import org.hibernate.ogm.datastore.infinispanremote.impl.schema.ProtobufTypeExporter;

import org.infinispan.protostream.MessageMarshaller.ProtoStreamReader;
import org.infinispan.protostream.MessageMarshaller.ProtoStreamWriter;
import org.infinispan.protostream.descriptors.FieldDescriptor;

/**
* Catching all IOException cases makes usage of lambdas inconvenient.
* Wrap all ProtofieldAccessor instances with this for convenience.
*/
final class UnsafeProtofield<T> implements ProtobufFieldExporter, ProtobufTypeExporter, ProtostreamMappedField<T> {

private final ProtofieldAccessor<T> delegate;
private final BaseProtofieldAccessor<T> delegate;

UnsafeProtofield(ProtofieldAccessor<T> delegate) {
UnsafeProtofield(BaseProtofieldAccessor<T> delegate) {
Objects.requireNonNull( delegate );
this.delegate = delegate;
}
Expand Down Expand Up @@ -73,4 +75,7 @@ public String toString() {
return "UnsafeProtofield [delegate=" + delegate + "]";
}

public boolean isDescribedIn(FieldDescriptor fieldDescriptor) {
return delegate.getProtobufName().equals( fieldDescriptor.getName() ) && delegate.getProtobufTypeName().equals( fieldDescriptor.getTypeName() );
}
}
Expand Up @@ -4,17 +4,19 @@
* License: GNU Lesser General Public License (LGPL), version 2.1 or later
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.ogm.datastore.infinispanremote.impl.protobuf;
package org.hibernate.ogm.datastore.infinispanremote.impl.protobuf.schema;

import java.io.IOException;
import java.io.StringReader;
import java.lang.invoke.MethodHandles;
import java.net.URL;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import org.hibernate.AssertionFailure;
import org.hibernate.ogm.datastore.infinispanremote.impl.InfinispanRemoteDatastoreProvider;
import org.hibernate.ogm.datastore.infinispanremote.impl.protobuf.TypeDeclarationsCollector;
import org.hibernate.ogm.datastore.infinispanremote.impl.protostream.OgmProtoStreamMarshaller;
import org.hibernate.ogm.datastore.infinispanremote.impl.protostream.ProtoDataMapper;
import org.hibernate.ogm.datastore.infinispanremote.impl.schema.SequenceTableDefinition;
Expand All @@ -33,10 +35,10 @@ public class SchemaDefinitions {

private static final Log LOG = LoggerFactory.make( MethodHandles.lookup() );

private final String packageName;
private final Map<String,TableDefinition> definitionsByTableName = new HashMap<>();
private final Map<IdSourceKeyMetadata, SequenceTableDefinition> idSchemaPerMetadata = new HashMap<>();
private final Map<String, SequenceTableDefinition> idSchemaPerName = new HashMap<>();
final String packageName;
final Map<String,TableDefinition> definitionsByTableName = new HashMap<>();
final Map<IdSourceKeyMetadata, SequenceTableDefinition> idSchemaPerMetadata = new HashMap<>();
final Map<String, SequenceTableDefinition> idSchemaPerName = new HashMap<>();

//guarded by synchronization on this
private String cachedSchema = null;
Expand All @@ -49,11 +51,18 @@ public SchemaDefinitions(String packageName) {
// (both the schema definitions and the key/value pairs)
// This resource is defined in the Protostream jar.
// Typically this is transparently handled by using the Protostream codecs but be aware of it when bypassing Protostream.
public void deploySchema(String generatedProtobufName, RemoteCache<String, String> protobufCache, SchemaCapture schemaCapture, SchemaOverride schemaOverrideService,
URL schemaOverrideResource) {
// user defined schema
if ( schemaOverrideService != null || schemaOverrideResource != null ) {
cachedSchema = new SchemaValidator( this, schemaOverrideService, schemaOverrideResource, generatedProtobufName ).provideSchema();
}

// or generate them
generateProtoschema();

public void deploySchema(String generatedProtobufName, RemoteCache<String, String> protobufCache, SchemaCapture schemaCapture, SchemaOverride schemaOverrideService) {
final String generatedProtoschema = schemaOverrideService == null ? generateProtoschema() : schemaOverrideService.createProtobufSchema();
try {
protobufCache.put( generatedProtobufName, generatedProtoschema );
protobufCache.put( generatedProtobufName, cachedSchema );
String errors = protobufCache.get( generatedProtobufName + ".errors" );
if ( errors != null ) {
throw LOG.errorAtSchemaDeploy( generatedProtobufName, errors );
Expand All @@ -64,7 +73,7 @@ public void deploySchema(String generatedProtobufName, RemoteCache<String, Strin
throw LOG.errorAtSchemaDeploy( generatedProtobufName, hrce );
}
if ( schemaCapture != null ) {
schemaCapture.put( generatedProtobufName, generatedProtoschema );
schemaCapture.put( generatedProtobufName, cachedSchema );
}
}

Expand Down

0 comments on commit 6be4c28

Please sign in to comment.