Skip to content

Commit

Permalink
DRILL-2408 (part 2): CTAS should not create empty folders when underl…
Browse files Browse the repository at this point in the history
…ying query returns no results

- changed ParquetRecordWriter to avoid creating the parquet file until the first row of data is available
- Moved unit tests in a separate test class that starts 3 drillbits, to test the case where multiple fragments are attempting to write empty parquet files
- changed BaseQueryTest to update the storage plugin in all started bits and not just the first one
  • Loading branch information
adeneche authored and Aman Sinha committed May 7, 2015
1 parent d12bee0 commit 868ce4d
Show file tree
Hide file tree
Showing 6 changed files with 166 additions and 106 deletions.
Expand Up @@ -25,7 +25,6 @@
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;


import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.types.TypeProtos.DataMode; import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.ExecConstants;
Expand Down Expand Up @@ -64,20 +63,19 @@
import com.google.common.collect.Lists; import com.google.common.collect.Lists;


public class ParquetRecordWriter extends ParquetOutputRecordWriter { public class ParquetRecordWriter extends ParquetOutputRecordWriter {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRecordWriter.class); private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRecordWriter.class);


private static final int MINIMUM_BUFFER_SIZE = 64 * 1024; private static final int MINIMUM_BUFFER_SIZE = 64 * 1024;
private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100; private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
private static final int MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000; private static final int MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;


private ParquetFileWriter parquetFileWriter; private ParquetFileWriter parquetFileWriter;
private MessageType schema; private MessageType schema;
private Map<String, String> extraMetaData = new HashMap(); private Map<String, String> extraMetaData = new HashMap<>();
private int blockSize; private int blockSize;
private int pageSize = 1 * 1024 * 1024; private int pageSize = 1024 * 1024;
private int dictionaryPageSize = pageSize; private int dictionaryPageSize = pageSize;
private boolean enableDictionary = false; private boolean enableDictionary = false;
private boolean validating = false;
private CompressionCodecName codec = CompressionCodecName.SNAPPY; private CompressionCodecName codec = CompressionCodecName.SNAPPY;
private WriterVersion writerVersion = WriterVersion.PARQUET_1_0; private WriterVersion writerVersion = WriterVersion.PARQUET_1_0;
private DirectCodecFactory codecFactory; private DirectCodecFactory codecFactory;
Expand All @@ -96,7 +94,6 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
private String prefix; private String prefix;
private int index = 0; private int index = 0;
private OperatorContext oContext; private OperatorContext oContext;
private ParquetDirectByteBufferAllocator allocator;


public ParquetRecordWriter(FragmentContext context, ParquetWriter writer) throws OutOfMemoryException{ public ParquetRecordWriter(FragmentContext context, ParquetWriter writer) throws OutOfMemoryException{
super(); super();
Expand Down Expand Up @@ -152,29 +149,18 @@ private void newSchema() throws IOException {
} }
schema = new MessageType("root", types); schema = new MessageType("root", types);


Path fileName = getPath();
parquetFileWriter = new ParquetFileWriter(conf, schema, fileName);
parquetFileWriter.start();

int initialBlockBufferSize = max(MINIMUM_BUFFER_SIZE, blockSize / this.schema.getColumns().size() / 5); int initialBlockBufferSize = max(MINIMUM_BUFFER_SIZE, blockSize / this.schema.getColumns().size() / 5);
pageStore = ColumnChunkPageWriteStoreExposer.newColumnChunkPageWriteStore(this.oContext, pageStore = ColumnChunkPageWriteStoreExposer.newColumnChunkPageWriteStore(this.oContext,
codecFactory.getCompressor(codec, pageSize), codecFactory.getCompressor(codec, pageSize),
schema, schema,
initialBlockBufferSize); initialBlockBufferSize);
int initialPageBufferSize = max(MINIMUM_BUFFER_SIZE, min(pageSize + pageSize / 10, initialBlockBufferSize)); int initialPageBufferSize = max(MINIMUM_BUFFER_SIZE, min(pageSize + pageSize / 10, initialBlockBufferSize));
store = new ColumnWriteStoreV1(pageStore, pageSize, initialPageBufferSize, dictionaryPageSize, enableDictionary, writerVersion); store = new ColumnWriteStoreV1(pageStore, pageSize, initialPageBufferSize, dictionaryPageSize, enableDictionary, writerVersion);
MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(this.schema); MessageColumnIO columnIO = new ColumnIOFactory(false).getColumnIO(this.schema);
consumer = columnIO.getRecordWriter(store); consumer = columnIO.getRecordWriter(store);
setUp(schema, consumer); setUp(schema, consumer);
} }


/**
* @return Path for the latest file created
*/
private Path getPath() {
return new Path(location, prefix + "_" + index + ".parquet");
}

private PrimitiveType getPrimitiveType(MaterializedField field) { private PrimitiveType getPrimitiveType(MaterializedField field) {
MinorType minorType = field.getType().getMinorType(); MinorType minorType = field.getType().getMinorType();
String name = field.getLastName(); String name = field.getLastName();
Expand Down Expand Up @@ -204,12 +190,18 @@ private parquet.schema.Type getType(MaterializedField field) {
} }


private void flush() throws IOException { private void flush() throws IOException {
parquetFileWriter.startBlock(recordCount); if (recordCount > 0) {
store.flush(); parquetFileWriter.startBlock(recordCount);
ColumnChunkPageWriteStoreExposer.flushPageStore(pageStore, parquetFileWriter); store.flush();
recordCount = 0; ColumnChunkPageWriteStoreExposer.flushPageStore(pageStore, parquetFileWriter);
parquetFileWriter.endBlock(); recordCount = 0;
parquetFileWriter.end(extraMetaData); parquetFileWriter.endBlock();

// we are writing one single block per file
parquetFileWriter.end(extraMetaData);
parquetFileWriter = null;
}

store.close(); store.close();
ColumnChunkPageWriteStoreExposer.close(pageStore); ColumnChunkPageWriteStoreExposer.close(pageStore);
store = null; store = null;
Expand Down Expand Up @@ -307,7 +299,16 @@ public void startRecord() throws IOException {
@Override @Override
public void endRecord() throws IOException { public void endRecord() throws IOException {
consumer.endMessage(); consumer.endMessage();

// we wait until there is at least one record before creating the parquet file
if (parquetFileWriter == null) {
Path path = new Path(location, prefix + "_" + index + ".parquet");
parquetFileWriter = new ParquetFileWriter(conf, schema, path);
parquetFileWriter.start();
}

recordCount++; recordCount++;

checkBlockSizeReached(); checkBlockSizeReached();
} }


Expand All @@ -317,34 +318,8 @@ public void abort() throws IOException {


@Override @Override
public void cleanup() throws IOException { public void cleanup() throws IOException {
boolean hasRecords = recordCount > 0; flush();
if (hasRecords) {
parquetFileWriter.startBlock(recordCount);
store.flush();
ColumnChunkPageWriteStoreExposer.flushPageStore(pageStore, parquetFileWriter);
recordCount = 0;
parquetFileWriter.endBlock();
parquetFileWriter.end(extraMetaData);
}
if (store != null) {
store.close();
}
if (pageStore != null) {
ColumnChunkPageWriteStoreExposer.close(pageStore);
}


codecFactory.close(); codecFactory.close();

if (!hasRecords) {
// the very last file is empty, delete it (DRILL-2408)
Path path = getPath();
logger.debug("no record written, deleting parquet file {}", path);
FileSystem fs = path.getFileSystem(conf);
if (fs.exists(path)) {
if (!fs.delete(path, false)) {
throw new DrillRuntimeException("Couldn't delete empty file " + path);
}
}
}
} }
} }
Expand Up @@ -43,30 +43,37 @@ public class TestUtilities {
private static final String dfsTestPluginName = "dfs_test"; private static final String dfsTestPluginName = "dfs_test";
private static final String dfsTestTmpSchema = "tmp"; private static final String dfsTestTmpSchema = "tmp";


/**
* Create and removes a temporary folder
*
* @return absolute path to temporary folder
*/
public static String createTempDir() {
final File tmpDir = Files.createTempDir();
tmpDir.deleteOnExit();
return tmpDir.getAbsolutePath();
}

/** /**
* Update the location of dfs_test.tmp location. Get the "dfs_test.tmp" workspace and update the location with an * Update the location of dfs_test.tmp location. Get the "dfs_test.tmp" workspace and update the location with an
* exclusive temp directory just for use in the current test jvm. * exclusive temp directory just for use in the current test jvm.
* *
* @param pluginRegistry * @param pluginRegistry
* @return JVM exclusive temporary directory location. * @return JVM exclusive temporary directory location.
*/ */
public static String updateDfsTestTmpSchemaLocation(final StoragePluginRegistry pluginRegistry) public static void updateDfsTestTmpSchemaLocation(final StoragePluginRegistry pluginRegistry,
final String tmpDirPath)
throws ExecutionSetupException { throws ExecutionSetupException {
final FileSystemPlugin plugin = (FileSystemPlugin) pluginRegistry.getPlugin(dfsTestPluginName); final FileSystemPlugin plugin = (FileSystemPlugin) pluginRegistry.getPlugin(dfsTestPluginName);
final FileSystemConfig pluginConfig = (FileSystemConfig) plugin.getConfig(); final FileSystemConfig pluginConfig = (FileSystemConfig) plugin.getConfig();
final WorkspaceConfig tmpWSConfig = pluginConfig.workspaces.get(dfsTestTmpSchema); final WorkspaceConfig tmpWSConfig = pluginConfig.workspaces.get(dfsTestTmpSchema);


final File tmpDir = Files.createTempDir(); final WorkspaceConfig newTmpWSConfig = new WorkspaceConfig(tmpDirPath, true, tmpWSConfig.getDefaultInputFormat());
tmpDir.deleteOnExit();
final WorkspaceConfig newTmpWSConfig = new WorkspaceConfig(tmpDir.getAbsolutePath(),
true, tmpWSConfig.getDefaultInputFormat());


pluginConfig.workspaces.remove(dfsTestTmpSchema); pluginConfig.workspaces.remove(dfsTestTmpSchema);
pluginConfig.workspaces.put(dfsTestTmpSchema, newTmpWSConfig); pluginConfig.workspaces.put(dfsTestTmpSchema, newTmpWSConfig);


pluginRegistry.createOrUpdate(dfsTestPluginName, pluginConfig, true); pluginRegistry.createOrUpdate(dfsTestPluginName, pluginConfig, true);

return tmpDir.getAbsolutePath();
} }


/** /**
Expand Down
10 changes: 6 additions & 4 deletions exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
Expand Up @@ -166,15 +166,17 @@ private static void openClient() throws Exception {
serviceSet = RemoteServiceSet.getLocalServiceSet(); serviceSet = RemoteServiceSet.getLocalServiceSet();
} }


dfsTestTmpSchemaLocation = TestUtilities.createTempDir();

bits = new Drillbit[drillbitCount]; bits = new Drillbit[drillbitCount];
for(int i = 0; i < drillbitCount; i++) { for(int i = 0; i < drillbitCount; i++) {
bits[i] = new Drillbit(config, serviceSet); bits[i] = new Drillbit(config, serviceSet);
bits[i].run(); bits[i].run();
}


final StoragePluginRegistry pluginRegistry = getDrillbitContext().getStorage(); final StoragePluginRegistry pluginRegistry = bits[i].getContext().getStorage();
dfsTestTmpSchemaLocation = TestUtilities.updateDfsTestTmpSchemaLocation(pluginRegistry); TestUtilities.updateDfsTestTmpSchemaLocation(pluginRegistry, dfsTestTmpSchemaLocation);
TestUtilities.makeDfsTmpSchemaImmutable(pluginRegistry); TestUtilities.makeDfsTmpSchemaImmutable(pluginRegistry);
}


client = QueryTestUtil.createClient(config, serviceSet, MAX_WIDTH_PER_NODE, null); client = QueryTestUtil.createClient(config, serviceSet, MAX_WIDTH_PER_NODE, null);
} }
Expand Down
Expand Up @@ -17,7 +17,6 @@
*/ */
package org.apache.drill.exec.physical.impl.writer; package org.apache.drill.exec.physical.impl.writer;


import java.io.File;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.sql.Date; import java.sql.Date;


Expand All @@ -28,7 +27,6 @@
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Rule; import org.junit.Rule;
Expand Down Expand Up @@ -445,22 +443,6 @@ public void testWriteDecimal() throws Exception {
} }
} }



@Test // see DRILL-2408
public void testWriteEmptyFile() throws Exception {
String outputFile = "testparquetwriter_test_write_empty_file";

try {
Path path = new Path(getDfsTestTmpSchemaLocation(), outputFile);
// test("ALTER SESSION SET `planner.add_producer_consumer` = false");
test("CREATE TABLE dfs_test.tmp.%s AS SELECT * FROM cp.`employee.json` WHERE 1=0", outputFile);

Assert.assertEquals(fs.listStatus(path).length, 0);
} finally {
deleteTableIfExists(outputFile);
}
}

@Test // DRILL-2341 @Test // DRILL-2341
public void tableSchemaWhenSelectFieldsInDef_SelectFieldsInView() throws Exception { public void tableSchemaWhenSelectFieldsInDef_SelectFieldsInView() throws Exception {
final String newTblName = "testTableOutputSchema"; final String newTblName = "testTableOutputSchema";
Expand Down Expand Up @@ -531,31 +513,6 @@ public void createTableWhenAViewWithSameNameAlreadyExists() throws Exception{
} }
} }


@Test // see DRILL-2408
public void testWriteEmptyFileAfterFlush() throws Exception {
String outputFile = "testparquetwriter_test_write_empty_file_after_flush";

try {
// this specific value will force a flush just after the final row is written
// this will cause the creation of a new "empty" parquet file
test("ALTER SESSION SET `store.parquet.block-size` = 19926");

String query = "SELECT * FROM cp.`employee.json` LIMIT 100";
test("CREATE TABLE dfs_test.tmp.%s AS %s", outputFile, query);

// this query will fail if the "empty" file wasn't deleted
testBuilder()
.unOrdered()
.sqlQuery("SELECT * FROM dfs_test.tmp.%s", outputFile)
.sqlBaselineQuery(query)
.go();
} finally {
// restore the session option
test("ALTER SESSION SET `store.parquet.block-size` = %d", ExecConstants.PARQUET_BLOCK_SIZE_VALIDATOR.getDefault().num_val);
deleteTableIfExists(outputFile);
}
}

private static void deleteTableIfExists(String tableName) { private static void deleteTableIfExists(String tableName) {
try { try {
Path path = new Path(getDfsTestTmpSchemaLocation(), tableName); Path path = new Path(getDfsTestTmpSchemaLocation(), tableName);
Expand Down

0 comments on commit 868ce4d

Please sign in to comment.