diff --git a/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeBatchSource.java b/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeBatchSource.java index 53abea9..3c6c8db 100644 --- a/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeBatchSource.java +++ b/src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeBatchSource.java @@ -87,8 +87,7 @@ public void prepareRun(BatchSourceContext context) { @Override public void initialize(BatchRuntimeContext context) throws Exception { super.initialize(context); - SnowflakeSourceAccessor snowflakeAccessor = new SnowflakeSourceAccessor(config); - Schema schema = SchemaHelper.getSchema(snowflakeAccessor, config.getImportQuery()); + Schema schema = SchemaHelper.getSchema(config, context.getFailureCollector()); this.transformer = new SnowflakeMapToRecordTransformer(schema); } diff --git a/src/test/java/io/cdap/plugin/snowflake/common/util/SchemaHelperTest.java b/src/test/java/io/cdap/plugin/snowflake/common/util/SchemaHelperTest.java index 9a40530..63bf0af 100644 --- a/src/test/java/io/cdap/plugin/snowflake/common/util/SchemaHelperTest.java +++ b/src/test/java/io/cdap/plugin/snowflake/common/util/SchemaHelperTest.java @@ -184,4 +184,21 @@ public void testGetSchemaWhenMacroIsEnabledSchemaIsNull() { Assert.assertNull(actual); } + + @Test + public void testGetSchemaManuallyUpdatedTheSchema() { + Schema expected = Schema.recordOf("test", + Schema.Field.of("test_field", Schema.nullableOf(Schema.of(Schema.Type.LONG))) + ); + + SnowflakeBatchSourceConfig mockConfig = Mockito.mock(SnowflakeBatchSourceConfig.class); + Mockito.when(mockConfig.canConnect()).thenReturn(false); + Mockito.when(mockConfig.getSchema()).thenReturn(expected.toString()); + + MockFailureCollector collector = new MockFailureCollector(MOCK_STAGE); + Schema actual = SchemaHelper.getSchema(mockConfig, collector); + + Assert.assertTrue(collector.getValidationFailures().isEmpty()); + Assert.assertEquals(expected, actual); + } }