Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 21 additions & 5 deletions parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ public class ParquetFileWriter {
public static final byte[] MAGIC = "PAR1".getBytes(Charset.forName("ASCII"));
public static final int CURRENT_VERSION = 1;

// File creation modes
public static final int CREATE = 0;
public static final int OVERWRITE = 1;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please create an enum for this.


private static final ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter();

private final MessageType schema;
Expand Down Expand Up @@ -141,17 +145,29 @@ private final STATE error() throws IOException {
private STATE state = STATE.NOT_STARTED;

/**
*
* @param configuration Hadoop configuration
* @param schema the schema of the data
* @param out the file to write to
* @param codec the codec to use to compress blocks
* @throws IOException if the file can not be created
* @param path the file to write to
* @throws IOException if the file cannot be created
*/
public ParquetFileWriter(Configuration configuration, MessageType schema, Path file) throws IOException {
this(configuration, schema, file, CREATE);
}

/**
* @param configuration Hadoop configuration
* @param schema the schema of the data
* @param path the file to write to
* @param mode file creation mode
* @throws IOException if the file cannot be created
*/
public ParquetFileWriter(Configuration configuration, MessageType schema,
Path file, int mode) throws IOException {
super();
this.schema = schema;
FileSystem fs = file.getFileSystem(configuration);
this.out = fs.create(file, false);
boolean overwriteFlag = (mode == OVERWRITE);
this.out = fs.create(file, overwriteFlag);
}

/**
Expand Down
35 changes: 34 additions & 1 deletion parquet-hadoop/src/main/java/parquet/hadoop/ParquetWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,44 @@ public ParquetWriter(
boolean validating,
WriterVersion writerVersion,
Configuration conf) throws IOException {
this(file, ParquetFileWriter.CREATE, writeSupport, compressionCodecName,
blockSize, pageSize, dictionaryPageSize, enableDictionary,
validating, writerVersion, conf);
}
/**
* Create a new ParquetWriter.
*
* @param file the file to create
* @param mode file creation mode
* @param writeSupport the implementation to write a record to a RecordConsumer
* @param compressionCodecName the compression codec to use
* @param blockSize the block size threshold
* @param pageSize the page size threshold
* @param dictionaryPageSize the page size threshold for the dictionary pages
* @param enableDictionary to turn dictionary encoding on
* @param validating to turn on validation using the schema
* @param writerVersion version of parquetWriter from {@link ParquetProperties.WriterVersion}
* @param conf Hadoop configuration to use while accessing the filesystem
* @throws IOException
*/
public ParquetWriter(
Path file,
int mode,
WriteSupport<T> writeSupport,
CompressionCodecName compressionCodecName,
int blockSize,
int pageSize,
int dictionaryPageSize,
boolean enableDictionary,
boolean validating,
WriterVersion writerVersion,
Configuration conf) throws IOException {

WriteSupport.WriteContext writeContext = writeSupport.init(conf);
MessageType schema = writeContext.getSchema();

ParquetFileWriter fileWriter = new ParquetFileWriter(conf, schema, file);
ParquetFileWriter fileWriter = new ParquetFileWriter(conf, schema, file,
mode);
fileWriter.start();

CodecFactory codecFactory = new CodecFactory(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,39 @@
public class TestParquetFileWriter {
private static final Log LOG = Log.getLog(TestParquetFileWriter.class);

@Test
public void testWriteMode() throws Exception {
File testDir = new File("target/test/TestParquetFileWriter/");
testDir.mkdirs();
File testFile = new File(testDir, "testParquetFile");
testFile = testFile.getAbsoluteFile();
testFile.createNewFile();
MessageType schema = MessageTypeParser.parseMessageType(
"message m { required group a {required binary b;} required group "
+ "c { required int64 d; }}");
Configuration conf = new Configuration();

ParquetFileWriter writer = null;
boolean exceptionThrown = false;
Path path = new Path(testFile.toURI());
try {
writer = new ParquetFileWriter(conf, schema, path,
ParquetFileWriter.CREATE);
} catch(IOException ioe1) {
exceptionThrown = true;
}
assertTrue(exceptionThrown);
exceptionThrown = false;
try {
writer = new ParquetFileWriter(conf, schema, path,
ParquetFileWriter.OVERWRITE);
} catch(IOException ioe2) {
exceptionThrown = true;
}
assertTrue(!exceptionThrown);
testFile.delete();
}

@Test
public void testWriteRead() throws Exception {

Expand Down