diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/avro/HiveAvroSerDeManager.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/avro/HiveAvroSerDeManager.java index 7277c2e4375..6a8f1f626f8 100644 --- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/avro/HiveAvroSerDeManager.java +++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/avro/HiveAvroSerDeManager.java @@ -56,6 +56,10 @@ public class HiveAvroSerDeManager extends HiveSerDeManager { public static final boolean DEFAULT_USE_SCHEMA_FILE = false; public static final String SCHEMA_FILE_NAME = "schema.file.name"; public static final String DEFAULT_SCHEMA_FILE_NAME = "_schema.avsc"; + public static final String SCHEMA_TEMP_FILE_NAME = "schema.temp.file.name"; + public static final String DEFAULT_SCHEMA_TEMP_FILE_NAME = "_schema_temp.avsc"; + public static final String USE_SCHEMA_TEMP_FILE = "use.schema.temp.file"; + public static final boolean DEFAULT_USE_SCHEMA_TEMP_FILE = false; public static final String SCHEMA_LITERAL_LENGTH_LIMIT = "schema.literal.length.limit"; public static final int DEFAULT_SCHEMA_LITERAL_LENGTH_LIMIT = 4000; public static final String HIVE_SPEC_SCHEMA_READING_TIMER = "hiveAvroSerdeManager.schemaReadTimer"; @@ -64,6 +68,8 @@ public class HiveAvroSerDeManager extends HiveSerDeManager { protected final FileSystem fs; protected final boolean useSchemaFile; protected final String schemaFileName; + protected final boolean useSchemaTempFile; + protected final String schemaTempFileName; protected final int schemaLiteralLengthLimit; protected final HiveSerDeWrapper serDeWrapper = HiveSerDeWrapper.get("AVRO"); @@ -79,7 +85,9 @@ public HiveAvroSerDeManager(State props) throws IOException { } this.useSchemaFile = props.getPropAsBoolean(USE_SCHEMA_FILE, DEFAULT_USE_SCHEMA_FILE); + this.useSchemaTempFile = props.getPropAsBoolean(USE_SCHEMA_TEMP_FILE, DEFAULT_USE_SCHEMA_TEMP_FILE); this.schemaFileName = props.getProp(SCHEMA_FILE_NAME, DEFAULT_SCHEMA_FILE_NAME); + this.schemaTempFileName = props.getProp(SCHEMA_TEMP_FILE_NAME, DEFAULT_SCHEMA_TEMP_FILE_NAME); this.schemaLiteralLengthLimit = props.getPropAsInt(SCHEMA_LITERAL_LENGTH_LIMIT, DEFAULT_SCHEMA_LITERAL_LENGTH_LIMIT); @@ -172,7 +180,13 @@ protected void addSchemaFromAvroFile(Schema schema, Path schemaFile, HiveRegistr if (schemaStr.length() <= this.schemaLiteralLengthLimit) { hiveUnit.setSerDeProp(SCHEMA_LITERAL, schema.toString()); } else { - AvroUtils.writeSchemaToFile(schema, schemaFile, this.fs, true); + Path schemaTempFile = null; + + if (useSchemaTempFile) { + schemaTempFile = new Path(schemaFile.getParent(), this.schemaTempFileName); + } + + AvroUtils.writeSchemaToFile(schema, schemaFile, schemaTempFile, this.fs, true); log.info("Using schema file " + schemaFile.toString()); hiveUnit.setSerDeProp(SCHEMA_URL, schemaFile.toString()); } diff --git a/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/avro/HiveAvroSerDeManagerTest.java b/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/avro/HiveAvroSerDeManagerTest.java new file mode 100644 index 00000000000..054ca40eee3 --- /dev/null +++ b/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/avro/HiveAvroSerDeManagerTest.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.hive.avro; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.hive.HiveRegistrationUnit; +import org.apache.gobblin.hive.HiveTable; + + +@Test(singleThreaded = true) +public class HiveAvroSerDeManagerTest { + private static String TEST_DB = "testDB"; + private static String TEST_TABLE = "testTable"; + private Path testBasePath; + + @BeforeClass + public void setUp() throws IOException { + FileSystem fs = FileSystem.getLocal(new Configuration()); + this.testBasePath = new Path("testdir"); + fs.delete(this.testBasePath, true); + fs.delete(this.testBasePath, true); + + fs.mkdirs(this.testBasePath); + + Files.copy(this.getClass().getResourceAsStream("/test-hive-table/hive-test.avro"), + Paths.get(this.testBasePath.toString(), "hive-test.avro"), StandardCopyOption.REPLACE_EXISTING); + } + + /** + * Test that the schema is written to the schema literal + */ + @Test + public void testSchemaLiteral() throws IOException { + State state = new State(); + HiveAvroSerDeManager manager = new HiveAvroSerDeManager(state); + HiveRegistrationUnit registrationUnit = (new HiveTable.Builder()).withDbName(TEST_DB).withTableName(TEST_TABLE).build(); + + manager.addSerDeProperties(this.testBasePath, registrationUnit); + + Assert.assertTrue(registrationUnit.getSerDeProps().getProp(HiveAvroSerDeManager.SCHEMA_LITERAL).contains("example.avro")); + } + + @Test + public void testSchemaUrl() throws IOException { + State state = new State(); + state.setProp(HiveAvroSerDeManager.SCHEMA_LITERAL_LENGTH_LIMIT, "10"); + + validateSchemaUrl(state, HiveAvroSerDeManager.DEFAULT_SCHEMA_FILE_NAME, false); + } + + @Test + public void testSchemaUrlWithExistingFile() throws IOException { + State state = new State(); + state.setProp(HiveAvroSerDeManager.SCHEMA_LITERAL_LENGTH_LIMIT, "10"); + + validateSchemaUrl(state, HiveAvroSerDeManager.DEFAULT_SCHEMA_FILE_NAME, true); + } + + @Test + public void testSchemaUrlWithTempFile() throws IOException { + final String SCHEMA_FILE_NAME = "test_temp.avsc"; + State state = new State(); + state.setProp(HiveAvroSerDeManager.SCHEMA_LITERAL_LENGTH_LIMIT, "10"); + state.setProp(HiveAvroSerDeManager.USE_SCHEMA_TEMP_FILE, "true"); + state.setProp(HiveAvroSerDeManager.SCHEMA_FILE_NAME, SCHEMA_FILE_NAME); + state.setProp(HiveAvroSerDeManager.USE_SCHEMA_TEMP_FILE, "true"); + + validateSchemaUrl(state, SCHEMA_FILE_NAME, false); + } + + @Test + public void testSchemaUrlWithTempFileAndExistingFile() throws IOException { + final String SCHEMA_FILE_NAME = "test_temp.avsc"; + State state = new State(); + state.setProp(HiveAvroSerDeManager.SCHEMA_LITERAL_LENGTH_LIMIT, "10"); + state.setProp(HiveAvroSerDeManager.USE_SCHEMA_TEMP_FILE, "true"); + state.setProp(HiveAvroSerDeManager.SCHEMA_FILE_NAME, SCHEMA_FILE_NAME); + state.setProp(HiveAvroSerDeManager.USE_SCHEMA_TEMP_FILE, "true"); + + validateSchemaUrl(state, SCHEMA_FILE_NAME, true); + } + + private void validateSchemaUrl(State state, String targetSchemaFileName, boolean createConflictingFile) throws IOException { + HiveAvroSerDeManager manager = new HiveAvroSerDeManager(state); + HiveRegistrationUnit registrationUnit = (new HiveTable.Builder()).withDbName(TEST_DB).withTableName(TEST_TABLE).build(); + + // Clean up existing file + String targetPathStr = new Path(this.testBasePath, targetSchemaFileName).toString(); + File targetFile = new File(targetPathStr); + targetFile.delete(); + + // create a conflicting file + if (createConflictingFile) { + targetFile.createNewFile(); + } + + manager.addSerDeProperties(this.testBasePath, registrationUnit); + + Assert.assertNull(registrationUnit.getSerDeProps().getProp(HiveAvroSerDeManager.SCHEMA_LITERAL)); + String schemaUrl = registrationUnit.getSerDeProps().getProp(HiveAvroSerDeManager.SCHEMA_URL); + Assert.assertEquals(schemaUrl, targetPathStr); + Assert.assertTrue(IOUtils.contentEquals(this.getClass().getResourceAsStream("/test-hive-table/hive-test.avsc"), + new FileInputStream(schemaUrl))); + } + + @AfterClass + public void tearDown() throws IOException { + FileSystem fs = FileSystem.getLocal(new Configuration()); + fs.delete(this.testBasePath, true); + } + +} diff --git a/gobblin-hive-registration/src/test/resources/test-hive-table/hive-test.avsc b/gobblin-hive-registration/src/test/resources/test-hive-table/hive-test.avsc new file mode 100755 index 00000000000..80fc01e5f2d Binary files /dev/null and b/gobblin-hive-registration/src/test/resources/test-hive-table/hive-test.avsc differ diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java index 52de135a48b..36a88bbd148 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java @@ -387,21 +387,63 @@ public static Schema parseSchemaFromFile(Path filePath, FileSystem fs) throws IO public static void writeSchemaToFile(Schema schema, Path filePath, FileSystem fs, boolean overwrite) throws IOException { - writeSchemaToFile(schema, filePath, fs, overwrite, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.READ)); + writeSchemaToFile(schema, filePath, null, fs, overwrite); + } + + public static void writeSchemaToFile(Schema schema, Path filePath, Path tempFilePath, FileSystem fs, boolean overwrite) + throws IOException { + writeSchemaToFile(schema, filePath, tempFilePath, fs, overwrite, new FsPermission(FsAction.ALL, FsAction.ALL, + FsAction.READ)); } public static void writeSchemaToFile(Schema schema, Path filePath, FileSystem fs, boolean overwrite, FsPermission perm) throws IOException { + writeSchemaToFile(schema, filePath, null, fs, overwrite, perm); + } + + /** + * Write a schema to a file + * @param schema the schema + * @param filePath the target file + * @param tempFilePath if not null then this path is used for a temporary file used to stage the write + * @param fs a {@link FileSystem} + * @param overwrite should any existing target file be overwritten? + * @param perm permissions + * @throws IOException + */ + public static void writeSchemaToFile(Schema schema, Path filePath, Path tempFilePath, FileSystem fs, boolean overwrite, + FsPermission perm) + throws IOException { + boolean fileExists = fs.exists(filePath); + if (!overwrite) { - Preconditions.checkState(!fs.exists(filePath), filePath + " already exists"); + Preconditions.checkState(!fileExists, filePath + " already exists"); } else { - HadoopUtils.deletePath(fs, filePath, true); + // delete the target file now if not using a staging file + if (fileExists && null == tempFilePath) { + HadoopUtils.deletePath(fs, filePath, true); + // file has been removed + fileExists = false; + } } - try (DataOutputStream dos = fs.create(filePath)) { + // If the file exists then write to a temp file to make the replacement as close to atomic as possible + Path writeFilePath = fileExists ? tempFilePath : filePath; + + try (DataOutputStream dos = fs.create(writeFilePath)) { dos.writeChars(schema.toString()); } - fs.setPermission(filePath, perm); + fs.setPermission(writeFilePath, perm); + + // Replace existing file with the staged file + if (fileExists) { + if (!fs.delete(filePath, true)) { + throw new IOException( + String.format("Failed to delete %s while renaming %s to %s", filePath, tempFilePath, filePath)); + } + + HadoopUtils.movePath(fs, tempFilePath, fs, filePath, true, fs.getConf()); + } } /**