Skip to content

Commit

Permalink
[GOBBLIN-583] Add support for writing the schema file using a staging…
Browse files Browse the repository at this point in the history
… file when there is an existing target file

Closes #2447 from htran1/schema_avsc_permissions
  • Loading branch information
htran1 committed Sep 12, 2018
1 parent a6ec4a9 commit d3ba836
Show file tree
Hide file tree
Showing 4 changed files with 205 additions and 6 deletions.
Expand Up @@ -56,6 +56,10 @@ public class HiveAvroSerDeManager extends HiveSerDeManager {
public static final boolean DEFAULT_USE_SCHEMA_FILE = false;
public static final String SCHEMA_FILE_NAME = "schema.file.name";
public static final String DEFAULT_SCHEMA_FILE_NAME = "_schema.avsc";
public static final String SCHEMA_TEMP_FILE_NAME = "schema.temp.file.name";
public static final String DEFAULT_SCHEMA_TEMP_FILE_NAME = "_schema_temp.avsc";
public static final String USE_SCHEMA_TEMP_FILE = "use.schema.temp.file";
public static final boolean DEFAULT_USE_SCHEMA_TEMP_FILE = false;
public static final String SCHEMA_LITERAL_LENGTH_LIMIT = "schema.literal.length.limit";
public static final int DEFAULT_SCHEMA_LITERAL_LENGTH_LIMIT = 4000;
public static final String HIVE_SPEC_SCHEMA_READING_TIMER = "hiveAvroSerdeManager.schemaReadTimer";
Expand All @@ -64,6 +68,8 @@ public class HiveAvroSerDeManager extends HiveSerDeManager {
protected final FileSystem fs;
protected final boolean useSchemaFile;
protected final String schemaFileName;
protected final boolean useSchemaTempFile;
protected final String schemaTempFileName;
protected final int schemaLiteralLengthLimit;
protected final HiveSerDeWrapper serDeWrapper = HiveSerDeWrapper.get("AVRO");

Expand All @@ -79,7 +85,9 @@ public HiveAvroSerDeManager(State props) throws IOException {
}

this.useSchemaFile = props.getPropAsBoolean(USE_SCHEMA_FILE, DEFAULT_USE_SCHEMA_FILE);
this.useSchemaTempFile = props.getPropAsBoolean(USE_SCHEMA_TEMP_FILE, DEFAULT_USE_SCHEMA_TEMP_FILE);
this.schemaFileName = props.getProp(SCHEMA_FILE_NAME, DEFAULT_SCHEMA_FILE_NAME);
this.schemaTempFileName = props.getProp(SCHEMA_TEMP_FILE_NAME, DEFAULT_SCHEMA_TEMP_FILE_NAME);
this.schemaLiteralLengthLimit =
props.getPropAsInt(SCHEMA_LITERAL_LENGTH_LIMIT, DEFAULT_SCHEMA_LITERAL_LENGTH_LIMIT);

Expand Down Expand Up @@ -172,7 +180,13 @@ protected void addSchemaFromAvroFile(Schema schema, Path schemaFile, HiveRegistr
if (schemaStr.length() <= this.schemaLiteralLengthLimit) {
hiveUnit.setSerDeProp(SCHEMA_LITERAL, schema.toString());
} else {
AvroUtils.writeSchemaToFile(schema, schemaFile, this.fs, true);
Path schemaTempFile = null;

if (useSchemaTempFile) {
schemaTempFile = new Path(schemaFile.getParent(), this.schemaTempFileName);
}

AvroUtils.writeSchemaToFile(schema, schemaFile, schemaTempFile, this.fs, true);
log.info("Using schema file " + schemaFile.toString());
hiveUnit.setSerDeProp(SCHEMA_URL, schemaFile.toString());
}
Expand Down
@@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.hive.avro;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;

import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import org.apache.gobblin.configuration.State;
import org.apache.gobblin.hive.HiveRegistrationUnit;
import org.apache.gobblin.hive.HiveTable;


@Test(singleThreaded = true)
public class HiveAvroSerDeManagerTest {
private static String TEST_DB = "testDB";
private static String TEST_TABLE = "testTable";
private Path testBasePath;

@BeforeClass
public void setUp() throws IOException {
FileSystem fs = FileSystem.getLocal(new Configuration());
this.testBasePath = new Path("testdir");
fs.delete(this.testBasePath, true);
fs.delete(this.testBasePath, true);

fs.mkdirs(this.testBasePath);

Files.copy(this.getClass().getResourceAsStream("/test-hive-table/hive-test.avro"),
Paths.get(this.testBasePath.toString(), "hive-test.avro"), StandardCopyOption.REPLACE_EXISTING);
}

/**
* Test that the schema is written to the schema literal
*/
@Test
public void testSchemaLiteral() throws IOException {
State state = new State();
HiveAvroSerDeManager manager = new HiveAvroSerDeManager(state);
HiveRegistrationUnit registrationUnit = (new HiveTable.Builder()).withDbName(TEST_DB).withTableName(TEST_TABLE).build();

manager.addSerDeProperties(this.testBasePath, registrationUnit);

Assert.assertTrue(registrationUnit.getSerDeProps().getProp(HiveAvroSerDeManager.SCHEMA_LITERAL).contains("example.avro"));
}

@Test
public void testSchemaUrl() throws IOException {
State state = new State();
state.setProp(HiveAvroSerDeManager.SCHEMA_LITERAL_LENGTH_LIMIT, "10");

validateSchemaUrl(state, HiveAvroSerDeManager.DEFAULT_SCHEMA_FILE_NAME, false);
}

@Test
public void testSchemaUrlWithExistingFile() throws IOException {
State state = new State();
state.setProp(HiveAvroSerDeManager.SCHEMA_LITERAL_LENGTH_LIMIT, "10");

validateSchemaUrl(state, HiveAvroSerDeManager.DEFAULT_SCHEMA_FILE_NAME, true);
}

@Test
public void testSchemaUrlWithTempFile() throws IOException {
final String SCHEMA_FILE_NAME = "test_temp.avsc";
State state = new State();
state.setProp(HiveAvroSerDeManager.SCHEMA_LITERAL_LENGTH_LIMIT, "10");
state.setProp(HiveAvroSerDeManager.USE_SCHEMA_TEMP_FILE, "true");
state.setProp(HiveAvroSerDeManager.SCHEMA_FILE_NAME, SCHEMA_FILE_NAME);
state.setProp(HiveAvroSerDeManager.USE_SCHEMA_TEMP_FILE, "true");

validateSchemaUrl(state, SCHEMA_FILE_NAME, false);
}

@Test
public void testSchemaUrlWithTempFileAndExistingFile() throws IOException {
final String SCHEMA_FILE_NAME = "test_temp.avsc";
State state = new State();
state.setProp(HiveAvroSerDeManager.SCHEMA_LITERAL_LENGTH_LIMIT, "10");
state.setProp(HiveAvroSerDeManager.USE_SCHEMA_TEMP_FILE, "true");
state.setProp(HiveAvroSerDeManager.SCHEMA_FILE_NAME, SCHEMA_FILE_NAME);
state.setProp(HiveAvroSerDeManager.USE_SCHEMA_TEMP_FILE, "true");

validateSchemaUrl(state, SCHEMA_FILE_NAME, true);
}

private void validateSchemaUrl(State state, String targetSchemaFileName, boolean createConflictingFile) throws IOException {
HiveAvroSerDeManager manager = new HiveAvroSerDeManager(state);
HiveRegistrationUnit registrationUnit = (new HiveTable.Builder()).withDbName(TEST_DB).withTableName(TEST_TABLE).build();

// Clean up existing file
String targetPathStr = new Path(this.testBasePath, targetSchemaFileName).toString();
File targetFile = new File(targetPathStr);
targetFile.delete();

// create a conflicting file
if (createConflictingFile) {
targetFile.createNewFile();
}

manager.addSerDeProperties(this.testBasePath, registrationUnit);

Assert.assertNull(registrationUnit.getSerDeProps().getProp(HiveAvroSerDeManager.SCHEMA_LITERAL));
String schemaUrl = registrationUnit.getSerDeProps().getProp(HiveAvroSerDeManager.SCHEMA_URL);
Assert.assertEquals(schemaUrl, targetPathStr);
Assert.assertTrue(IOUtils.contentEquals(this.getClass().getResourceAsStream("/test-hive-table/hive-test.avsc"),
new FileInputStream(schemaUrl)));
}

@AfterClass
public void tearDown() throws IOException {
FileSystem fs = FileSystem.getLocal(new Configuration());
fs.delete(this.testBasePath, true);
}

}
Binary file not shown.
Expand Up @@ -387,21 +387,63 @@ public static Schema parseSchemaFromFile(Path filePath, FileSystem fs) throws IO

public static void writeSchemaToFile(Schema schema, Path filePath, FileSystem fs, boolean overwrite)
throws IOException {
writeSchemaToFile(schema, filePath, fs, overwrite, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.READ));
writeSchemaToFile(schema, filePath, null, fs, overwrite);
}

public static void writeSchemaToFile(Schema schema, Path filePath, Path tempFilePath, FileSystem fs, boolean overwrite)
throws IOException {
writeSchemaToFile(schema, filePath, tempFilePath, fs, overwrite, new FsPermission(FsAction.ALL, FsAction.ALL,
FsAction.READ));
}

public static void writeSchemaToFile(Schema schema, Path filePath, FileSystem fs, boolean overwrite, FsPermission perm)
throws IOException {
writeSchemaToFile(schema, filePath, null, fs, overwrite, perm);
}

/**
* Write a schema to a file
* @param schema the schema
* @param filePath the target file
* @param tempFilePath if not null then this path is used for a temporary file used to stage the write
* @param fs a {@link FileSystem}
* @param overwrite should any existing target file be overwritten?
* @param perm permissions
* @throws IOException
*/
public static void writeSchemaToFile(Schema schema, Path filePath, Path tempFilePath, FileSystem fs, boolean overwrite,
FsPermission perm)
throws IOException {
boolean fileExists = fs.exists(filePath);

if (!overwrite) {
Preconditions.checkState(!fs.exists(filePath), filePath + " already exists");
Preconditions.checkState(!fileExists, filePath + " already exists");
} else {
HadoopUtils.deletePath(fs, filePath, true);
// delete the target file now if not using a staging file
if (fileExists && null == tempFilePath) {
HadoopUtils.deletePath(fs, filePath, true);
// file has been removed
fileExists = false;
}
}

try (DataOutputStream dos = fs.create(filePath)) {
// If the file exists then write to a temp file to make the replacement as close to atomic as possible
Path writeFilePath = fileExists ? tempFilePath : filePath;

try (DataOutputStream dos = fs.create(writeFilePath)) {
dos.writeChars(schema.toString());
}
fs.setPermission(filePath, perm);
fs.setPermission(writeFilePath, perm);

// Replace existing file with the staged file
if (fileExists) {
if (!fs.delete(filePath, true)) {
throw new IOException(
String.format("Failed to delete %s while renaming %s to %s", filePath, tempFilePath, filePath));
}

HadoopUtils.movePath(fs, tempFilePath, fs, filePath, true, fs.getConf());
}
}

/**
Expand Down

0 comments on commit d3ba836

Please sign in to comment.