Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CDAP-16994 fix schema equality check during pipeline deployment #12364

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -41,7 +41,11 @@ public final class SchemaHash implements Serializable {
private String hashStr;

public SchemaHash(Schema schema) {
hash = computeHash(schema);
this(schema, true);
}

public SchemaHash(Schema schema, boolean includeRecordName) {
hash = computeHash(schema, includeRecordName);
}

/**
Expand Down Expand Up @@ -92,10 +96,10 @@ public String toString() {
return str;
}

private byte[] computeHash(Schema schema) {
private byte[] computeHash(Schema schema, boolean includeRecordName) {
try {
Set<String> knownRecords = new HashSet<>();
MessageDigest md5 = updateHash(MessageDigest.getInstance("MD5"), schema, knownRecords);
MessageDigest md5 = updateHash(MessageDigest.getInstance("MD5"), schema, knownRecords, includeRecordName);
return md5.digest();
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException(e);
Expand All @@ -108,9 +112,11 @@ private byte[] computeHash(Schema schema) {
* @param md5 {@link MessageDigest} to update.
* @param schema {@link Schema} for updating the md5.
* @param knownRecords bytes to use for updating the md5 for records that're seen before.
* @param includeRecordName whether to include the record name in the hash.
* @return The same {@link MessageDigest} in the parameter.
*/
private MessageDigest updateHash(MessageDigest md5, Schema schema, Set<String> knownRecords) {
private MessageDigest updateHash(MessageDigest md5, Schema schema, Set<String> knownRecords,
boolean includeRecordName) {
// Don't use enum.ordinal() as ordering in enum could change
switch (schema.getType()) {
case NULL:
Expand Down Expand Up @@ -145,28 +151,30 @@ private MessageDigest updateHash(MessageDigest md5, Schema schema, Set<String> k
break;
case ARRAY:
md5.update((byte) 9);
updateHash(md5, schema.getComponentSchema(), knownRecords);
updateHash(md5, schema.getComponentSchema(), knownRecords, includeRecordName);
break;
case MAP:
md5.update((byte) 10);
updateHash(md5, schema.getMapSchema().getKey(), knownRecords);
updateHash(md5, schema.getMapSchema().getValue(), knownRecords);
updateHash(md5, schema.getMapSchema().getKey(), knownRecords, includeRecordName);
updateHash(md5, schema.getMapSchema().getValue(), knownRecords, includeRecordName);
break;
case RECORD:
md5.update((byte) 11);
md5.update(UTF_8.encode(schema.getRecordName()));
if (includeRecordName) {
md5.update(UTF_8.encode(schema.getRecordName()));
}
boolean notKnown = knownRecords.add(schema.getRecordName());
for (Schema.Field field : schema.getFields()) {
md5.update(UTF_8.encode(field.getName()));
if (notKnown) {
updateHash(md5, field.getSchema(), knownRecords);
updateHash(md5, field.getSchema(), knownRecords, includeRecordName);
}
}
break;
case UNION:
md5.update((byte) 12);
for (Schema unionSchema : schema.getUnionSchemas()) {
updateHash(md5, unionSchema, knownRecords);
updateHash(md5, unionSchema, knownRecords, includeRecordName);
}
break;
}
Expand Down
@@ -0,0 +1,36 @@
/*
* Copyright © 2020 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.etl.common;

import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.data.schema.SchemaHash;

/**
* Schema utility methods
*/
public class Schemas {

private Schemas() {
// no-op constructor for utility class
}

public static boolean equalsIgnoringRecordName(Schema s1, Schema s2) {
SchemaHash hash1 = new SchemaHash(s1, false);
SchemaHash hash2 = new SchemaHash(s2, false);
return hash1.equals(hash2);
}
}
Expand Up @@ -19,6 +19,7 @@
import io.cdap.cdap.api.common.Bytes;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.etl.common.Schemas;

import java.lang.reflect.Array;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -153,7 +154,7 @@ private boolean matchesSchema(Object val, Schema schema) {
case RECORD:
if (val instanceof StructuredRecord) {
StructuredRecord s = (StructuredRecord) val;
return s.getSchema().equals(schema);
return Schemas.equalsIgnoringRecordName(s.getSchema(), schema);
} else {
return false;
}
Expand Down
Expand Up @@ -55,6 +55,7 @@
import io.cdap.cdap.etl.common.DefaultAutoJoinerContext;
import io.cdap.cdap.etl.common.DefaultPipelineConfigurer;
import io.cdap.cdap.etl.common.DefaultStageConfigurer;
import io.cdap.cdap.etl.common.Schemas;
import io.cdap.cdap.etl.planner.Dag;
import io.cdap.cdap.etl.proto.ArtifactSelectorConfig;
import io.cdap.cdap.etl.proto.Connection;
Expand Down Expand Up @@ -241,7 +242,7 @@ private ConfiguredStage configureStage(ETLStage stage, ValidatedPipeline validat
// this check isn't perfect, as we should still error if 2 transforms are inputs,
// one has null schema and another has non-null schema.
// todo: fix this cleanly and fully
if (outputSchema != null && !outputSchema.equals(schema)) {
if (outputSchema != null && !Schemas.equalsIgnoringRecordName(outputSchema, schema)) {
throw new IllegalArgumentException("Cannot have different input schemas going into stage " + stageName);
}
outputSchema = schema;
Expand Down
Expand Up @@ -25,6 +25,7 @@
import io.cdap.cdap.etl.api.condition.Condition;
import io.cdap.cdap.etl.common.DefaultPipelineConfigurer;
import io.cdap.cdap.etl.common.DefaultStageConfigurer;
import io.cdap.cdap.etl.common.Schemas;
import io.cdap.cdap.etl.proto.v2.spec.StageSpec;

import java.util.Map;
Expand Down Expand Up @@ -111,9 +112,18 @@ private Schema getNextStageInputSchema(StageSpec currentStageSpec, String nextSt
}
}

private boolean hasSameSchema(Map<String, Schema> inputSchemas, Schema inputSchema) {
private boolean hasSameSchema(Map<String, Schema> inputSchemas, @Nullable Schema inputSchema) {
if (!inputSchemas.isEmpty()) {
return Objects.equals(inputSchemas.values().iterator().next(), inputSchema);
Schema s = inputSchemas.values().iterator().next();
if (s == null && inputSchema == null) {
return true;
} else if (s == null) {
return false;
} else if (inputSchema == null) {
return false;
}

return Schemas.equalsIgnoringRecordName(s, inputSchema);
}
return true;
}
Expand Down
@@ -0,0 +1,97 @@
/*
* Copyright © 2020 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.etl.common;

import io.cdap.cdap.api.data.schema.Schema;
import org.junit.Assert;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;

/**
* Tests for {@link Schema}
*/
public class SchemasTest {

@Test
public void testEqualsIgnoringRecordNameFlat() {
List<Schema.Field> fields = new ArrayList<>();
fields.add(Schema.Field.of("a", Schema.of(Schema.Type.STRING)));
fields.add(Schema.Field.of("b", Schema.of(Schema.Type.BOOLEAN)));
Schema s1 = Schema.recordOf("s1", fields);
Schema s2 = Schema.recordOf("s2", fields);
Assert.assertNotEquals(s1, s2);
Assert.assertTrue(Schemas.equalsIgnoringRecordName(s1, s2));
}

@Test
public void testEqualsIgnoringRecordNameInsideRecord() {
Schema inner1 = Schema.recordOf("inner1", Schema.Field.of("x", Schema.of(Schema.Type.INT)));
Schema inner2 = Schema.recordOf("inner2", Schema.Field.of("x", Schema.of(Schema.Type.INT)));

Schema outer1 = Schema.recordOf("outer1", Schema.Field.of("inner", inner1));
Schema outer2 = Schema.recordOf("outer2", Schema.Field.of("inner", inner2));
Assert.assertNotEquals(outer1, outer2);
Assert.assertTrue(Schemas.equalsIgnoringRecordName(outer1, outer2));
}

@Test
public void testEqualsIgnoringRecordNameInsideUnion() {
Schema inner1 = Schema.recordOf("inner1", Schema.Field.of("x", Schema.of(Schema.Type.INT)));
Schema inner2 = Schema.recordOf("inner2", Schema.Field.of("x", Schema.of(Schema.Type.INT)));

Schema s1 = Schema.recordOf("s1", Schema.Field.of("u", Schema.unionOf(inner1, Schema.of(Schema.Type.NULL))));
Schema s2 = Schema.recordOf("s2", Schema.Field.of("u", Schema.unionOf(inner2, Schema.of(Schema.Type.NULL))));
Assert.assertNotEquals(s1, s2);
Assert.assertTrue(Schemas.equalsIgnoringRecordName(s1, s2));

Schema inner3 = Schema.recordOf("inner3", Schema.Field.of("x", Schema.of(Schema.Type.LONG)));
Schema s3 = Schema.recordOf("s3", Schema.Field.of("u", Schema.unionOf(inner3, Schema.of(Schema.Type.NULL))));
Assert.assertFalse(Schemas.equalsIgnoringRecordName(s1, s3));
}

@Test
public void testEqualsIgnoringRecordNameInsideMap() {
Schema inner1 = Schema.recordOf("inner1", Schema.Field.of("x", Schema.of(Schema.Type.INT)));
Schema inner2 = Schema.recordOf("inner2", Schema.Field.of("x", Schema.of(Schema.Type.INT)));

Schema s1 = Schema.recordOf("s1", Schema.Field.of("u", Schema.mapOf(Schema.of(Schema.Type.STRING), inner1)));
Schema s2 = Schema.recordOf("s2", Schema.Field.of("u", Schema.mapOf(Schema.of(Schema.Type.STRING), inner2)));
Assert.assertNotEquals(s1, s2);
Assert.assertTrue(Schemas.equalsIgnoringRecordName(s1, s2));

Schema inner3 = Schema.recordOf("inner3", Schema.Field.of("x", Schema.of(Schema.Type.LONG)));
Schema s3 = Schema.recordOf("s3", Schema.Field.of("u", Schema.mapOf(Schema.of(Schema.Type.STRING), inner3)));
Assert.assertFalse(Schemas.equalsIgnoringRecordName(s1, s3));
}

@Test
public void testEqualsIgnoringRecordNameInsideArray() {
Schema inner1 = Schema.recordOf("inner1", Schema.Field.of("x", Schema.of(Schema.Type.INT)));
Schema inner2 = Schema.recordOf("inner2", Schema.Field.of("x", Schema.of(Schema.Type.INT)));

Schema s1 = Schema.recordOf("s1", Schema.Field.of("u", Schema.arrayOf(inner1)));
Schema s2 = Schema.recordOf("s2", Schema.Field.of("u", Schema.arrayOf(inner2)));
Assert.assertNotEquals(s1, s2);
Assert.assertTrue(Schemas.equalsIgnoringRecordName(s1, s2));

Schema inner3 = Schema.recordOf("inner3", Schema.Field.of("x", Schema.of(Schema.Type.LONG)));
Schema s3 = Schema.recordOf("s3", Schema.Field.of("u", Schema.arrayOf(inner3)));
Assert.assertFalse(Schemas.equalsIgnoringRecordName(s1, s3));
}
}
Expand Up @@ -69,13 +69,15 @@
*/
public class PipelineSpecGeneratorTest {
private static final Schema SCHEMA_A = Schema.recordOf("a", Schema.Field.of("a", Schema.of(Schema.Type.STRING)));
private static final Schema SCHEMA_A2 = Schema.recordOf("a2", Schema.Field.of("a", Schema.of(Schema.Type.STRING)));
private static final Schema SCHEMA_B = Schema.recordOf("b", Schema.Field.of("b", Schema.of(Schema.Type.STRING)));
private static final Schema SCHEMA_ABC = Schema.recordOf("abc",
Schema.Field.of("a", Schema.of(Schema.Type.STRING)),
Schema.Field.of("b", Schema.of(Schema.Type.STRING)),
Schema.Field.of("c", Schema.of(Schema.Type.INT)));
private static final Map<String, String> EMPTY_MAP = ImmutableMap.of();
private static final ETLPlugin MOCK_SOURCE = new ETLPlugin("mocksource", BatchSource.PLUGIN_TYPE, EMPTY_MAP);
private static final ETLPlugin MOCK_SOURCE2 = new ETLPlugin("mocksource2", BatchSource.PLUGIN_TYPE, EMPTY_MAP);
private static final ETLPlugin MOCK_TRANSFORM_A = new ETLPlugin("mockA", Transform.PLUGIN_TYPE, EMPTY_MAP);
private static final ETLPlugin MOCK_TRANSFORM_B = new ETLPlugin("mockB", Transform.PLUGIN_TYPE, EMPTY_MAP);
private static final ETLPlugin MOCK_TRANSFORM_ABC = new ETLPlugin("mockABC", Transform.PLUGIN_TYPE, EMPTY_MAP);
Expand All @@ -98,6 +100,8 @@ public static void setupTests() {
Set<ArtifactId> artifactIds = ImmutableSet.of(ARTIFACT_ID);
pluginConfigurer.addMockPlugin(BatchSource.PLUGIN_TYPE, "mocksource",
MockPlugin.builder().setOutputSchema(SCHEMA_A).build(), artifactIds);
pluginConfigurer.addMockPlugin(BatchSource.PLUGIN_TYPE, "mocksource2",
MockPlugin.builder().setOutputSchema(SCHEMA_A2).build(), artifactIds);
pluginConfigurer.addMockPlugin(Transform.PLUGIN_TYPE, "mockA",
MockPlugin.builder().setOutputSchema(SCHEMA_A).setErrorSchema(SCHEMA_B).build(),
artifactIds);
Expand All @@ -121,7 +125,6 @@ public static void setupTests() {
Engine.MAPREDUCE);
}


@Test(expected = IllegalArgumentException.class)
public void testUniqueStageNames() throws ValidationException {
ETLBatchConfig etlConfig = ETLBatchConfig.builder()
Expand Down Expand Up @@ -292,6 +295,44 @@ public void testGenerateSpec() throws ValidationException {
Assert.assertEquals(expected, actual);
}

@Test
public void testInputSchemasWithDifferentName() {
ETLBatchConfig etlConfig = ETLBatchConfig.builder()
.addStage(new ETLStage("s1", MOCK_SOURCE))
.addStage(new ETLStage("s2", MOCK_SOURCE2))
.addStage(new ETLStage("sink", MOCK_SINK))
.addConnection("s1", "sink")
.addConnection("s2", "sink")
.setNumOfRecordsPreview(100)
.build();

Map<String, String> emptyMap = Collections.emptyMap();
PipelineSpec expected = BatchPipelineSpec.builder()
.addStage(
StageSpec.builder("s1", new PluginSpec(BatchSource.PLUGIN_TYPE, "mocksource", emptyMap, ARTIFACT_ID))
.addOutput(SCHEMA_A, "sink")
.build())
.addStage(
StageSpec.builder("s2", new PluginSpec(BatchSource.PLUGIN_TYPE, "mocksource2", emptyMap, ARTIFACT_ID))
.addOutput(SCHEMA_A2, "sink")
.build())
.addStage(
StageSpec.builder("sink", new PluginSpec(BatchSink.PLUGIN_TYPE, "mocksink", emptyMap, ARTIFACT_ID))
.addInputSchemas(ImmutableMap.of("s1", SCHEMA_A, "s2", SCHEMA_A2))
.setErrorSchema(SCHEMA_A)
.build())
.addConnections(etlConfig.getConnections())
.setResources(etlConfig.getResources())
.setDriverResources(new Resources(1024, 1))
.setClientResources(new Resources(1024, 1))
.setStageLoggingEnabled(etlConfig.isStageLoggingEnabled())
.setNumOfRecordsPreview(etlConfig.getNumOfRecordsPreview())
.build();

PipelineSpec actual = specGenerator.generateSpec(etlConfig);
Assert.assertEquals(expected, actual);
}

@Test
public void testDifferentInputSchemasForAction() throws ValidationException {
/*
Expand Down
Expand Up @@ -55,6 +55,7 @@
import io.cdap.cdap.etl.common.NoopStageStatisticsCollector;
import io.cdap.cdap.etl.common.PipelinePhase;
import io.cdap.cdap.etl.common.RecordInfo;
import io.cdap.cdap.etl.common.Schemas;
import io.cdap.cdap.etl.common.StageStatisticsCollector;
import io.cdap.cdap.etl.proto.v2.spec.StageSpec;
import io.cdap.cdap.etl.spark.function.AlertPassFilter;
Expand Down Expand Up @@ -556,7 +557,7 @@ static List<Schema> deriveKeySchema(String joinerStageName, Map<String, List<Str

Schema existingSchema = keySchema.get(keyFieldNum);
if (existingSchema != null && existingSchema.isSimpleOrNullableSimple() &&
!existingSchema.equals(keyFieldSchema)) {
!Schemas.equalsIgnoringRecordName(existingSchema, keyFieldSchema)) {
// this is an invalid join definition
// this condition is normally checked at deployment time,
// but it will be skipped if the input schema is not known.
Expand Down