From 90965326f8a483dfe6a2ddb3e97ff50c98c661ed Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Fri, 2 Oct 2015 16:09:38 +0900 Subject: [PATCH] TAJO-1903: Insert clause occassionally fails on S3. --- .../tajo/storage/avro/AvroAppender.java | 7 +--- .../apache/tajo/storage/rcfile/RCFile.java | 6 +-- .../sequencefile/SequenceFileAppender.java | 5 --- .../tajo/storage/text/DelimitedTextFile.java | 17 +------- .../org/apache/tajo/storage/TestStorages.java | 41 ++++++++++++++++++- 5 files changed, 44 insertions(+), 32 deletions(-) diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java index ff0e8c04ca..e54fb8006d 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java @@ -36,7 +36,6 @@ import org.apache.tajo.storage.TableStatistics; import org.apache.tajo.storage.Tuple; -import java.io.FileNotFoundException; import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; @@ -71,10 +70,8 @@ public AvroAppender(Configuration conf, */ public void init() throws IOException { FileSystem fs = path.getFileSystem(conf); - if (!fs.exists(path.getParent())) { - throw new FileNotFoundException(path.toString()); - } - FSDataOutputStream outputStream = fs.create(path); + + FSDataOutputStream outputStream = fs.create(path, false); avroSchema = AvroUtil.getAvroSchema(meta, conf); avroFields = avroSchema.getFields(); diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java index 38a476149b..ed55506d22 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java @@ -727,10 +727,6 @@ public RCFileAppender(Configuration conf, final TaskAttemptId taskAttemptId, public void init() throws IOException { fs = path.getFileSystem(conf); - if (!fs.exists(path.getParent())) { - throw new FileNotFoundException(path.toString()); - } - if (this.meta.containsOption(StorageConstants.COMPRESSION_CODEC)) { String codecClassname = this.meta.getOption(StorageConstants.COMPRESSION_CODEC); try { @@ -773,7 +769,7 @@ public void init() throws IOException { columnBuffers[i] = new ColumnBuffer(); } - init(conf, fs.create(path, true, 4096, (short) 3, fs.getDefaultBlockSize(), null), codec, metadata); + init(conf, fs.create(path, false, 4096, (short) 3, fs.getDefaultBlockSize(), null), codec, metadata); initializeFileHeader(); writeFileHeader(); finalizeFileHeader(); diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java index ad622fe5e0..b1a14e356d 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java @@ -40,7 +40,6 @@ import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream; import org.apache.tajo.util.BytesUtils; -import java.io.FileNotFoundException; import java.io.IOException; public class SequenceFileAppender extends FileAppender { @@ -95,10 +94,6 @@ public void init() throws IOException { nullChars = nullCharacters.getBytes(); } - if (!fs.exists(path.getParent())) { - throw new FileNotFoundException(path.toString()); - } - if(this.meta.containsOption(StorageConstants.COMPRESSION_CODEC)) { String codecName = this.meta.getOption(StorageConstants.COMPRESSION_CODEC); codecFactory = new CompressionCodecFactory(conf); diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java index b5155d4169..c0ee7841cf 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java @@ -39,7 +39,6 @@ import org.apache.tajo.plan.expr.EvalNode; import org.apache.tajo.storage.*; import org.apache.tajo.storage.compress.CodecPool; -import org.apache.tajo.storage.exception.AlreadyExistsStorageException; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream; import org.apache.tajo.unit.StorageUnit; @@ -47,7 +46,6 @@ import java.io.BufferedOutputStream; import java.io.DataOutputStream; -import java.io.FileNotFoundException; import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -133,10 +131,6 @@ public TextLineSerDe getLineSerde() { @Override public void init() throws IOException { - if (!fs.exists(path.getParent())) { - throw new FileNotFoundException(path.toString()); - } - if (enabledStats) { this.stats = new TableStatistics(this.schema); } @@ -163,19 +157,12 @@ public void init() throws IOException { String extension = codec.getDefaultExtension(); compressedPath = path.suffix(extension); - if (fs.exists(compressedPath)) { - throw new AlreadyExistsStorageException(compressedPath); - } - - fos = fs.create(compressedPath); + fos = fs.create(compressedPath, false); deflateFilter = codec.createOutputStream(fos, compressor); outputStream = new DataOutputStream(deflateFilter); } else { - if (fs.exists(path)) { - throw new AlreadyExistsStorageException(path); - } - fos = fs.create(path); + fos = fs.create(path, false); outputStream = new DataOutputStream(new BufferedOutputStream(fos)); } diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java index 278de45a55..575126ccd7 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; import org.apache.tajo.BuiltinStorages; @@ -55,8 +56,7 @@ import java.util.Collection; import java.util.List; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; @RunWith(Parameterized.class) public class TestStorages { @@ -1114,4 +1114,41 @@ public final void testInsertFixedCharTypeWithOverSize() throws Exception { assertTrue(ok); } + + @Test + public void testFileAlreadyExists() throws IOException { + + if (internalType) return; + + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("age", Type.INT8); + schema.addColumn("score", Type.FLOAT4); + + TableMeta meta = CatalogUtil.newTableMeta(dataFormat); + meta.setOptions(CatalogUtil.newDefaultProperty(dataFormat)); + if (dataFormat.equalsIgnoreCase(BuiltinStorages.AVRO)) { + meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL, + TEST_PROJECTION_AVRO_SCHEMA); + } + + FileTablespace sm = TablespaceManager.getLocalFs(); + Path tablePath = new Path(testDir, "testFileAlreadyExists.data"); + + Appender appender = sm.getAppender(meta, schema, tablePath); + appender.init(); + appender.close(); + + try { + appender = sm.getAppender(meta, schema, tablePath); + appender.init(); + if (BuiltinStorages.ORC.equals(dataFormat)) { + appender.close(); + } + fail(dataFormat); + } catch (IOException e) { + } finally { + IOUtils.cleanup(null, appender); + } + } } \ No newline at end of file