Skip to content

Commit

Permalink
IGNITE-16573 Support schema changes concerning externalizability stat…
Browse files Browse the repository at this point in the history
…us (#785)
  • Loading branch information
rpuch committed Apr 20, 2022
1 parent a9e94e1 commit 6df9719
Show file tree
Hide file tree
Showing 10 changed files with 310 additions and 55 deletions.
Expand Up @@ -32,7 +32,6 @@
import java.lang.reflect.Modifier;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
Expand Down Expand Up @@ -137,7 +136,7 @@ private ClassDescriptor externalizable(int descriptorId, Class<? extends Externa
descriptorId,
superClassDescriptor(clazz),
componentTypeDescriptor(clazz),
Collections.emptyList(),
fields(clazz),
new Serialization(
SerializationType.EXTERNALIZABLE,
NO_WRITE_OBJECT,
Expand Down
Expand Up @@ -92,7 +92,8 @@ public DefaultUserObjectMarshaller(ClassDescriptorRegistry localRegistry, ClassD
this::marshalUnshared,
this::unmarshalShared,
this::unmarshalUnshared,
structuredObjectMarshaller
structuredObjectMarshaller,
schemaMismatchHandlers
);

proxyMarshaller = new ProxyMarshaller(this::marshalShared, this::unmarshalShared);
Expand Down Expand Up @@ -326,19 +327,19 @@ private <T> T unmarshalFromInput(
UnmarshallingContext context,
boolean unshared
) throws IOException, UnmarshalException {
ClassDescriptor descriptor = resolveDescriptor(input, declaredType, context);
ClassDescriptor remoteDescriptor = resolveDescriptor(input, declaredType, context);

if (mayHaveObjectIdentity(descriptor)) {
if (mayHaveObjectIdentity(remoteDescriptor)) {
int objectId = peekObjectId(input, context);
if (context.isKnownObjectId(objectId)) {
// this is a back-reference
return unmarshalReference(input, context, unshared);
}
}

Object readObject = readObject(input, context, descriptor, unshared);
Object readObject = readObject(input, context, remoteDescriptor, unshared);

@SuppressWarnings("unchecked") T resolvedObject = (T) applyReadResolveIfNeeded(readObject, descriptor);
@SuppressWarnings("unchecked") T resolvedObject = (T) applyReadResolveIfNeeded(readObject, remoteDescriptor);
return resolvedObject;
}

Expand Down Expand Up @@ -374,14 +375,14 @@ private <T> T unmarshalReference(DataInput input, UnmarshallingContext context,
}

@Nullable
private Object readObject(IgniteDataInput input, UnmarshallingContext context, ClassDescriptor descriptor, boolean unshared)
private Object readObject(IgniteDataInput input, UnmarshallingContext context, ClassDescriptor remoteDescriptor, boolean unshared)
throws IOException, UnmarshalException {
if (!mayHaveObjectIdentity(descriptor)) {
return readValue(input, descriptor, context);
} else if (mustBeReadInOneStage(descriptor)) {
return readIdentifiableInOneStage(input, descriptor, context, unshared);
if (!mayHaveObjectIdentity(remoteDescriptor)) {
return readValue(input, remoteDescriptor, context);
} else if (mustBeReadInOneStage(remoteDescriptor)) {
return readIdentifiableInOneStage(input, remoteDescriptor, context, unshared);
} else {
return readIdentifiableInTwoStages(input, descriptor, context, unshared);
return readIdentifiableInTwoStages(input, remoteDescriptor, context, unshared);
}
}

Expand Down Expand Up @@ -410,55 +411,56 @@ private int readObjectId(DataInput input) throws IOException {

private Object readIdentifiableInTwoStages(
IgniteDataInput input,
ClassDescriptor descriptor,
ClassDescriptor remoteDescriptor,
UnmarshallingContext context,
boolean unshared
) throws IOException, UnmarshalException {
int objectId = readObjectId(input);

Object preInstantiatedObject = preInstantiate(descriptor, input, context);
Object preInstantiatedObject = preInstantiate(remoteDescriptor, input, context);
context.registerReference(objectId, preInstantiatedObject, unshared);

fillObjectFrom(input, preInstantiatedObject, descriptor, context);
fillObjectFrom(input, preInstantiatedObject, remoteDescriptor, context);

return preInstantiatedObject;
}

private Object preInstantiate(ClassDescriptor descriptor, IgniteDataInput input, UnmarshallingContext context)
private Object preInstantiate(ClassDescriptor remoteDescriptor, IgniteDataInput input, UnmarshallingContext context)
throws IOException, UnmarshalException {
if (isBuiltInNonContainer(descriptor)) {
throw new IllegalStateException("Should not be here, descriptor is " + descriptor);
} else if (isBuiltInCollection(descriptor)) {
return builtInContainerMarshallers.preInstantiateBuiltInMutableCollection(descriptor, input, context);
} else if (isBuiltInMap(descriptor)) {
return builtInContainerMarshallers.preInstantiateBuiltInMutableMap(descriptor, input, context);
} else if (descriptor.isArray()) {
if (isBuiltInNonContainer(remoteDescriptor)) {
throw new IllegalStateException("Should not be here, descriptor is " + remoteDescriptor);
} else if (isBuiltInCollection(remoteDescriptor)) {
return builtInContainerMarshallers.preInstantiateBuiltInMutableCollection(remoteDescriptor, input, context);
} else if (isBuiltInMap(remoteDescriptor)) {
return builtInContainerMarshallers.preInstantiateBuiltInMutableMap(remoteDescriptor, input, context);
} else if (remoteDescriptor.isArray()) {
return builtInContainerMarshallers.preInstantiateGenericRefArray(input, context);
} else if (descriptor.isExternalizable()) {
return externalizableMarshaller.preInstantiateExternalizable(descriptor);
} else if (descriptor.isProxy()) {
} else if (remoteDescriptor.isExternalizable()) {
return externalizableMarshaller.preInstantiateExternalizable(remoteDescriptor);
} else if (remoteDescriptor.isProxy()) {
return proxyMarshaller.preInstantiateProxy(input, context);
} else {
return structuredObjectMarshaller.preInstantiateStructuredObject(descriptor);
return structuredObjectMarshaller.preInstantiateStructuredObject(remoteDescriptor);
}
}

private void fillObjectFrom(IgniteDataInput input, Object objectToFill, ClassDescriptor descriptor, UnmarshallingContext context)
private void fillObjectFrom(IgniteDataInput input, Object objectToFill, ClassDescriptor remoteDescriptor, UnmarshallingContext context)
throws UnmarshalException, IOException {
if (isBuiltInNonContainer(descriptor)) {
throw new IllegalStateException("Cannot fill " + descriptor.className() + ", this is a programmatic error");
} else if (isBuiltInCollection(descriptor)) {
fillBuiltInCollectionFrom(input, (Collection<?>) objectToFill, descriptor, context);
} else if (isBuiltInMap(descriptor)) {
if (isBuiltInNonContainer(remoteDescriptor)) {
throw new IllegalStateException("Cannot fill " + remoteDescriptor.className() + ", this is a programmatic error");
} else if (isBuiltInCollection(remoteDescriptor)) {
fillBuiltInCollectionFrom(input, (Collection<?>) objectToFill, remoteDescriptor, context);
} else if (isBuiltInMap(remoteDescriptor)) {
fillBuiltInMapFrom(input, (Map<?, ?>) objectToFill, context);
} else if (descriptor.isArray()) {
fillGenericRefArrayFrom(input, (Object[]) objectToFill, descriptor, context);
} else if (descriptor.isExternalizable()) {
externalizableMarshaller.fillExternalizableFrom(input, (Externalizable) objectToFill, context);
} else if (descriptor.isProxy()) {
} else if (remoteDescriptor.isArray()) {
fillGenericRefArrayFrom(input, (Object[]) objectToFill, remoteDescriptor, context);
} else if (remoteDescriptor.isExternalizable()) {
externalizableMarshaller.fillFromRemotelyExternalizable(input, objectToFill, context);
} else if (remoteDescriptor.isProxy()) {
proxyMarshaller.fillProxyFrom(input, objectToFill, context);
} else {
structuredObjectMarshaller.fillStructuredObjectFrom(input, objectToFill, descriptor, context);
structuredObjectMarshaller.fillStructuredObjectFrom(input, objectToFill, remoteDescriptor, context);
fireExternalizableMissedIfExternalizableLocally(objectToFill);
}
}

Expand Down Expand Up @@ -517,6 +519,12 @@ private void throwIfNotDrained(InputStream dis) throws IOException, UnmarshalExc
}
}

private void fireExternalizableMissedIfExternalizableLocally(Object objectToFill) throws SchemaMismatchException {
if (objectToFill instanceof Externalizable) {
schemaMismatchHandlers.onExternalizableMissed(objectToFill);
}
}

/** {@inheritDoc} */
@Override
public <T> void replaceSchemaMismatchHandler(Class<T> layerClass, SchemaMismatchHandler<T> handler) {
Expand Down
Expand Up @@ -22,6 +22,7 @@
import org.apache.ignite.internal.network.serialization.ClassDescriptor;
import org.apache.ignite.internal.util.io.IgniteDataInput;
import org.apache.ignite.internal.util.io.IgniteDataOutput;
import org.apache.ignite.internal.util.io.IgniteUnsafeDataInput;

/**
* (Um)marshalling specific to EXTERNALIZABLE serialization type.
Expand All @@ -33,20 +34,24 @@ class ExternalizableMarshaller {
private final TypedValueWriter unsharedWriter;
private final DefaultFieldsReaderWriter defaultFieldsReaderWriter;

private final SchemaMismatchHandlers schemaMismatchHandlers;

private final NoArgConstructorInstantiation instantiation = new NoArgConstructorInstantiation();

ExternalizableMarshaller(
TypedValueWriter typedValueWriter,
TypedValueWriter unsharedWriter,
TypedValueReader valueReader,
TypedValueReader unsharedReader,
DefaultFieldsReaderWriter defaultFieldsReaderWriter
DefaultFieldsReaderWriter defaultFieldsReaderWriter,
SchemaMismatchHandlers schemaMismatchHandlers
) {
this.valueWriter = typedValueWriter;
this.unsharedWriter = unsharedWriter;
this.valueReader = valueReader;
this.unsharedReader = unsharedReader;
this.defaultFieldsReaderWriter = defaultFieldsReaderWriter;
this.schemaMismatchHandlers = schemaMismatchHandlers;
}

void writeExternalizable(Externalizable externalizable, ClassDescriptor descriptor, IgniteDataOutput output, MarshallingContext context)
Expand All @@ -65,23 +70,50 @@ private void externalizeTo(Externalizable externalizable, IgniteDataOutput outpu
context.endWritingWithWriteObject();

try {
externalizable.writeExternal(oos);
oos.flush();
writeWithLength(externalizable, oos);
} finally {
oos.restoreCurrentPutFieldTo(oldPut);
}
}

@SuppressWarnings("unchecked")
<T extends Externalizable> T preInstantiateExternalizable(ClassDescriptor descriptor) throws UnmarshalException {
private void writeWithLength(Externalizable externalizable, UosObjectOutputStream oos) throws IOException {
// NB: this only works with purely in-memory IgniteDataInput implementations!

int offsetBefore = oos.memoryBufferOffset();

writeLengthPlaceholder(oos);

externalizable.writeExternal(oos);
oos.flush();

int externalDataLength = oos.memoryBufferOffset() - offsetBefore - Integer.BYTES;

oos.writeIntAtOffset(offsetBefore, externalDataLength);
}

private void writeLengthPlaceholder(UosObjectOutputStream oos) throws IOException {
oos.writeInt(0);
}

Object preInstantiateExternalizable(ClassDescriptor descriptor) throws UnmarshalException {
try {
return (T) instantiation.newInstance(descriptor.localClass());
return instantiation.newInstance(descriptor.localClass());
} catch (InstantiationException e) {
throw new UnmarshalException("Cannot instantiate " + descriptor.className(), e);
}
}

<T extends Externalizable> void fillExternalizableFrom(IgniteDataInput input, T object, UnmarshallingContext context)
void fillFromRemotelyExternalizable(IgniteDataInput input, Object object, UnmarshallingContext context)
throws UnmarshalException, IOException {
if (object instanceof Externalizable) {
fillExternalizableFrom(input, (Externalizable) object, context);
} else {
// it was serialized as an Externalizable, but locally it is not Externalizable; delegate to handler
fireExternalizableIgnored(object, input, context);
}
}

private <T extends Externalizable> void fillExternalizableFrom(IgniteDataInput input, T object, UnmarshallingContext context)
throws IOException, UnmarshalException {
// Do not close the stream yet!
UosObjectInputStream ois = context.objectInputStream(input, valueReader, unsharedReader, defaultFieldsReaderWriter);
Expand All @@ -90,11 +122,35 @@ <T extends Externalizable> void fillExternalizableFrom(IgniteDataInput input, T
context.endReadingWithReadObject();

try {
object.readExternal(ois);
readFramed(object, ois);
} catch (ClassNotFoundException e) {
throw new UnmarshalException("Cannot unmarshal due to a missing class", e);
} finally {
ois.restoreCurrentGetFieldTo(oldGet);
}
}

private <T extends Externalizable> void readFramed(T object, UosObjectInputStream ois) throws IOException, ClassNotFoundException {
skipExternalDataLength(ois);

object.readExternal(ois);
}

private void skipExternalDataLength(UosObjectInputStream ois) throws IOException {
ois.readInt();
}

private void fireExternalizableIgnored(Object object, IgniteDataInput input, UnmarshallingContext context)
throws SchemaMismatchException, IOException {
// We have additional allocations and copying here. It simplifies the code a lot, and it seems that we should
// not optimize for this rare corner case.

int externalDataLength = input.readInt();
byte[] externalDataBytes = input.readByteArray(externalDataLength);
IgniteDataInput externalDataInput = new IgniteUnsafeDataInput(externalDataBytes);

try (var oos = new UosObjectInputStream(externalDataInput, valueReader, unsharedReader, defaultFieldsReaderWriter, context)) {
schemaMismatchHandlers.onExternalizableIgnored(object, oos);
}
}
}
Expand Up @@ -17,6 +17,9 @@

package org.apache.ignite.internal.network.serialization.marshal;

import java.io.ObjectInput;
import java.io.ObjectOutput;

/**
* Handles situations when the schema that was used to serialize an object (remote schema) is different from the
* schema used to deserialize the object (local schema).
Expand Down Expand Up @@ -65,6 +68,31 @@ default void onFieldMissed(T instance, String fieldName) throws SchemaMismatchEx
* @throws SchemaMismatchException thrown if the handler wills to stop the deserialization with an error
*/
default void onFieldTypeChanged(T instance, String fieldName, Class<?> remoteType, Object fieldValue) throws SchemaMismatchException {
throw new SchemaMismatchException(fieldName + " type changed, serialized as " + remoteType.getName() + ", value " + fieldValue);
throw new SchemaMismatchException(fieldName + " type changed, serialized as " + remoteType.getName() + ", value " + fieldValue
+ " of type " + fieldName.getClass().getName());
}

/**
* Called when a remote class implements {@link java.io.Externalizable}, but local class does not.
*
* @param instance the object that was constructed, but not yet filled
* @param externalData externalized data that represents the object (it was written
* using {@link java.io.Externalizable#writeExternal(ObjectOutput)}
* @throws SchemaMismatchException thrown if the handler wills to stop the deserialization with an error
*/
default void onExternalizableIgnored(T instance, ObjectInput externalData) throws SchemaMismatchException {
throw new SchemaMismatchException("Class " + instance.getClass().getName()
+ " was serialized as an Externalizable remotely, but locally it is not an Externalizable");
}

/**
* Called when a remote class does not implement {@link java.io.Externalizable}, but local class does. The method is called after
* all read fields are assigned to the instance.
*
* @param instance the instance that has already been filled
* @throws SchemaMismatchException thrown if the handler wills to stop the deserialization with an error
*/
default void onExternalizableMissed(T instance) throws SchemaMismatchException {
// no-op
}
}
Expand Up @@ -17,6 +17,7 @@

package org.apache.ignite.internal.network.serialization.marshal;

import java.io.ObjectInput;
import java.util.HashMap;
import java.util.Map;

Expand Down Expand Up @@ -52,4 +53,12 @@ void onFieldTypeChanged(Class<?> layerClass, Object instance, String fieldName,
throws SchemaMismatchException {
handlerFor(layerClass).onFieldTypeChanged(instance, fieldName, remoteType, fieldValue);
}

void onExternalizableIgnored(Object instance, ObjectInput externalData) throws SchemaMismatchException {
handlerFor(instance.getClass()).onExternalizableIgnored(instance, externalData);
}

void onExternalizableMissed(Object instance) throws SchemaMismatchException {
handlerFor(instance.getClass()).onExternalizableMissed(instance);
}
}
Expand Up @@ -276,7 +276,7 @@ public ObjectStreamClass getObjectStreamClass() {

/** {@inheritDoc} */
@Override
public boolean defaulted(String name) throws IOException {
public boolean defaulted(String name) {
// TODO: IGNITE-16571 - actually take into account whether it's defaulted or not
return false;
}
Expand Down
Expand Up @@ -237,6 +237,21 @@ void restoreCurrentPutFieldTo(UosPutField newPut) {
currentPut = newPut;
}

int memoryBufferOffset() {
return output.offset();
}

void writeIntAtOffset(int offset, int value) throws IOException {
int oldOffset = output.offset();

try {
output.offset(offset);
output.writeInt(value);
} finally {
output.offset(oldOffset);
}
}

class UosPutField extends PutField {
private final ClassDescriptor descriptor;

Expand Down

0 comments on commit 6df9719

Please sign in to comment.