Skip to content

Commit

Permalink
PARQUET-353: Release compression resources.
Browse files Browse the repository at this point in the history
This updates the use of CodecFactory in the output format and writer
classes so that its lifecycle is tied to ParquetWriter and
ParquetRecordWriter. When those classes are closed, the resources held
by the CodecFactory associated with the instance are released.

This is an alternative to and closes #282.

Author: Ryan Blue <blue@apache.org>

Closes #295 from rdblue/PARQUET-353-release-compressor-resources and squashes the following commits:

a00f4b7 [Ryan Blue] PARQUET-353: Release compression resources.
  • Loading branch information
rdblue committed Dec 11, 2015
1 parent b45c4bd commit 4916903
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 25 deletions.
Expand Up @@ -323,9 +323,7 @@ private static int getMaxPaddingSize(Configuration conf) {

/**
* constructor used when this OutputFormat in wrapped in another one (In Pig for example)
* @param writeSupportClass the class used to convert the incoming records
* @param schema the schema of the records
* @param extraMetaData extra meta data to be stored in the footer of the file
* @param writeSupport the class used to convert the incoming records
*/
public <S extends WriteSupport<T>> ParquetOutputFormat(S writeSupport) {
this.writeSupport = writeSupport;
Expand Down Expand Up @@ -387,8 +385,6 @@ public RecordWriter<Void, T> getRecordWriter(Configuration conf, Path file, Comp
if (INFO) LOG.info("Min row count for page size check is: " + props.getMinRowCountForPageSizeCheck());
if (INFO) LOG.info("Min row count for page size check is: " + props.getMaxRowCountForPageSizeCheck());

CodecFactory codecFactory = new CodecFactory(conf, props.getPageSizeThreshold());

WriteContext init = writeSupport.init(conf);
ParquetFileWriter w = new ParquetFileWriter(
conf, init.getSchema(), file, Mode.CREATE, blockSize, maxPaddingSize);
Expand All @@ -411,10 +407,11 @@ public RecordWriter<Void, T> getRecordWriter(Configuration conf, Path file, Comp
init.getSchema(),
init.getExtraMetaData(),
blockSize,
codecFactory.getCompressor(codec),
codec,
validating,
props,
memoryManager);
memoryManager,
conf);
}

/**
Expand Down
Expand Up @@ -21,13 +21,15 @@
import java.io.IOException;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.ParquetProperties.WriterVersion;
import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;

import static org.apache.parquet.Preconditions.checkNotNull;
Expand All @@ -43,8 +45,9 @@
*/
public class ParquetRecordWriter<T> extends RecordWriter<Void, T> {

private InternalParquetRecordWriter<T> internalWriter;
private MemoryManager memoryManager;
private final InternalParquetRecordWriter<T> internalWriter;
private final MemoryManager memoryManager;
private final CodecFactory codecFactory;

/**
*
Expand Down Expand Up @@ -79,6 +82,7 @@ public ParquetRecordWriter(
internalWriter = new InternalParquetRecordWriter<T>(w, writeSupport, schema,
extraMetaData, blockSize, compressor, validating, props);
this.memoryManager = null;
this.codecFactory = null;
}

/**
Expand Down Expand Up @@ -106,14 +110,17 @@ public ParquetRecordWriter(
boolean validating,
WriterVersion writerVersion,
MemoryManager memoryManager) {
this(w, writeSupport, schema, extraMetaData, blockSize, compressor,
validating, ParquetProperties.builder()
.withPageSize(pageSize)
.withDictionaryPageSize(dictionaryPageSize)
.withDictionaryEncoding(enableDictionary)
.withWriterVersion(writerVersion)
.build(),
memoryManager);
ParquetProperties props = ParquetProperties.builder()
.withPageSize(pageSize)
.withDictionaryPageSize(dictionaryPageSize)
.withDictionaryEncoding(enableDictionary)
.withWriterVersion(writerVersion)
.build();
internalWriter = new InternalParquetRecordWriter<T>(w, writeSupport, schema,
extraMetaData, blockSize, compressor, validating, props);
this.memoryManager = checkNotNull(memoryManager, "memoryManager");
memoryManager.addWriter(internalWriter, blockSize);
this.codecFactory = null;
}

/**
Expand All @@ -123,7 +130,7 @@ public ParquetRecordWriter(
* @param schema the schema of the records
* @param extraMetaData extra meta data to write in the footer of the file
* @param blockSize the size of a block in the file (this will be approximate)
* @param compressor the compressor used to compress the pages
* @param codec the compression codec used to compress the pages
* @param validating if schema validation should be turned on
* @param props parquet encoding properties
*/
Expand All @@ -133,12 +140,15 @@ public ParquetRecordWriter(
MessageType schema,
Map<String, String> extraMetaData,
long blockSize,
BytesCompressor compressor,
CompressionCodecName codec,
boolean validating,
ParquetProperties props,
MemoryManager memoryManager) {
MemoryManager memoryManager,
Configuration conf) {
this.codecFactory = new CodecFactory(conf, props.getPageSizeThreshold());
internalWriter = new InternalParquetRecordWriter<T>(w, writeSupport, schema,
extraMetaData, blockSize, compressor, validating, props);
extraMetaData, blockSize, codecFactory.getCompressor(codec), validating,
props);
this.memoryManager = checkNotNull(memoryManager, "memoryManager");
memoryManager.addWriter(internalWriter, blockSize);
}
Expand All @@ -148,9 +158,16 @@ public ParquetRecordWriter(
*/
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
internalWriter.close();
if (memoryManager != null) {
memoryManager.removeWriter(internalWriter);
try {
internalWriter.close();
// release after the writer closes in case it is used for a last flush
} finally {
if (codecFactory != null) {
codecFactory.release();
}
if (memoryManager != null) {
memoryManager.removeWriter(internalWriter);
}
}
}

Expand Down
Expand Up @@ -52,6 +52,7 @@ public class ParquetWriter<T> implements Closeable {
public static final int MAX_PADDING_SIZE_DEFAULT = 0;

private final InternalParquetRecordWriter<T> writer;
private final CodecFactory codecFactory;

/**
* Create a new ParquetWriter.
Expand Down Expand Up @@ -273,7 +274,7 @@ public ParquetWriter(Path file, Configuration conf, WriteSupport<T> writeSupport
conf, schema, file, mode, blockSize, maxPaddingSize);
fileWriter.start();

CodecFactory codecFactory = new CodecFactory(conf, encodingProps.getPageSizeThreshold());
this.codecFactory = new CodecFactory(conf, encodingProps.getPageSizeThreshold());
CodecFactory.BytesCompressor compressor = codecFactory.getCompressor(compressionCodecName);
this.writer = new InternalParquetRecordWriter<T>(
fileWriter,
Expand All @@ -300,6 +301,9 @@ public void close() throws IOException {
writer.close();
} catch (InterruptedException e) {
throw new IOException(e);
} finally {
// release after the writer closes in case it is used for a last flush
codecFactory.release();
}
}

Expand Down

0 comments on commit 4916903

Please sign in to comment.