Skip to content
This repository was archived by the owner on May 12, 2021. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.tajo.storage.TableStatistics;
import org.apache.tajo.storage.Tuple;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
Expand Down Expand Up @@ -71,10 +70,8 @@ public AvroAppender(Configuration conf,
*/
public void init() throws IOException {
FileSystem fs = path.getFileSystem(conf);
if (!fs.exists(path.getParent())) {
throw new FileNotFoundException(path.toString());
}
FSDataOutputStream outputStream = fs.create(path);

FSDataOutputStream outputStream = fs.create(path, false);

avroSchema = AvroUtil.getAvroSchema(meta, conf);
avroFields = avroSchema.getFields();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -727,10 +727,6 @@ public RCFileAppender(Configuration conf, final TaskAttemptId taskAttemptId,
public void init() throws IOException {
fs = path.getFileSystem(conf);

if (!fs.exists(path.getParent())) {
throw new FileNotFoundException(path.toString());
}

if (this.meta.containsOption(StorageConstants.COMPRESSION_CODEC)) {
String codecClassname = this.meta.getOption(StorageConstants.COMPRESSION_CODEC);
try {
Expand Down Expand Up @@ -773,7 +769,7 @@ public void init() throws IOException {
columnBuffers[i] = new ColumnBuffer();
}

init(conf, fs.create(path, true, 4096, (short) 3, fs.getDefaultBlockSize(), null), codec, metadata);
init(conf, fs.create(path, false, 4096, (short) 3, fs.getDefaultBlockSize(), null), codec, metadata);
initializeFileHeader();
writeFileHeader();
finalizeFileHeader();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
import org.apache.tajo.util.BytesUtils;

import java.io.FileNotFoundException;
import java.io.IOException;

public class SequenceFileAppender extends FileAppender {
Expand Down Expand Up @@ -95,10 +94,6 @@ public void init() throws IOException {
nullChars = nullCharacters.getBytes();
}

if (!fs.exists(path.getParent())) {
throw new FileNotFoundException(path.toString());
}

if(this.meta.containsOption(StorageConstants.COMPRESSION_CODEC)) {
String codecName = this.meta.getOption(StorageConstants.COMPRESSION_CODEC);
codecFactory = new CompressionCodecFactory(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,13 @@
import org.apache.tajo.plan.expr.EvalNode;
import org.apache.tajo.storage.*;
import org.apache.tajo.storage.compress.CodecPool;
import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
import org.apache.tajo.unit.StorageUnit;
import org.apache.tajo.util.ReflectionUtil;

import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -133,10 +131,6 @@ public TextLineSerDe getLineSerde() {

@Override
public void init() throws IOException {
if (!fs.exists(path.getParent())) {
throw new FileNotFoundException(path.toString());
}

if (enabledStats) {
this.stats = new TableStatistics(this.schema);
}
Expand All @@ -163,19 +157,12 @@ public void init() throws IOException {
String extension = codec.getDefaultExtension();
compressedPath = path.suffix(extension);

if (fs.exists(compressedPath)) {
throw new AlreadyExistsStorageException(compressedPath);
}

fos = fs.create(compressedPath);
fos = fs.create(compressedPath, false);
deflateFilter = codec.createOutputStream(fos, compressor);
outputStream = new DataOutputStream(deflateFilter);

} else {
if (fs.exists(path)) {
throw new AlreadyExistsStorageException(path);
}
fos = fs.create(path);
fos = fs.create(path, false);
outputStream = new DataOutputStream(new BufferedOutputStream(fos));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.tajo.BuiltinStorages;
Expand Down Expand Up @@ -57,6 +58,7 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

@RunWith(Parameterized.class)
public class TestStorages {
Expand Down Expand Up @@ -1154,4 +1156,41 @@ public void testDateTextHandling() throws Exception {
OldStorageManager.clearCache();
}
}

@Test
public void testFileAlreadyExists() throws IOException {

if (internalType) return;

Schema schema = new Schema();
schema.addColumn("id", Type.INT4);
schema.addColumn("age", Type.INT8);
schema.addColumn("score", Type.FLOAT4);

TableMeta meta = CatalogUtil.newTableMeta(dataFormat);
meta.setOptions(CatalogUtil.newDefaultProperty(dataFormat));
if (dataFormat.equalsIgnoreCase(BuiltinStorages.AVRO)) {
meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL,
TEST_PROJECTION_AVRO_SCHEMA);
}

FileTablespace sm = TablespaceManager.getLocalFs();
Path tablePath = new Path(testDir, "testFileAlreadyExists.data");

Appender appender = sm.getAppender(meta, schema, tablePath);
appender.init();
appender.close();

try {
appender = sm.getAppender(meta, schema, tablePath);
appender.init();
if (BuiltinStorages.ORC.equals(dataFormat)) {
appender.close();
}
fail(dataFormat);
} catch (IOException e) {
} finally {
IOUtils.cleanup(null, appender);
}
}
}