From 7a8aa6fbe9940cae442670c6567c28cb70785350 Mon Sep 17 00:00:00 2001 From: Mariappan Asokan Date: Fri, 30 Jan 2015 19:57:36 -0500 Subject: [PATCH] PARQUET-134 patch - Support file write mode --- .../parquet/hadoop/ParquetFileWriter.java | 29 ++++++++++++--- .../java/parquet/hadoop/ParquetWriter.java | 35 ++++++++++++++++++- .../parquet/hadoop/TestParquetFileWriter.java | 33 +++++++++++++++++ 3 files changed, 91 insertions(+), 6 deletions(-) diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java index 1c651aff9d..f4126ee049 100644 --- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java +++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java @@ -66,6 +66,12 @@ public class ParquetFileWriter { public static final byte[] MAGIC = "PAR1".getBytes(Charset.forName("ASCII")); public static final int CURRENT_VERSION = 1; + // File creation modes + public static enum Mode { + CREATE, + OVERWRITE + } + private static final ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter(); private final MessageType schema; @@ -141,17 +147,30 @@ private final STATE error() throws IOException { private STATE state = STATE.NOT_STARTED; /** - * + * @param configuration Hadoop configuration + * @param schema the schema of the data + * @param file the file to write to + * @throws IOException if the file can not be created + */ + public ParquetFileWriter(Configuration configuration, MessageType schema, + Path file) throws IOException { + this(configuration, schema, file, Mode.CREATE); + } + + /** + * @param configuration Hadoop configuration * @param schema the schema of the data - * @param out the file to write to - * @param codec the codec to use to compress blocks + * @param file the file to write to + * @param mode file creation mode * @throws IOException if the file can not be created */ - public ParquetFileWriter(Configuration configuration, MessageType schema, Path file) throws IOException { + public ParquetFileWriter(Configuration configuration, MessageType schema, + Path file, Mode mode) throws IOException { super(); this.schema = schema; FileSystem fs = file.getFileSystem(configuration); - this.out = fs.create(file, false); + boolean overwriteFlag = (mode == Mode.OVERWRITE); + this.out = fs.create(file, overwriteFlag); } /** diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetWriter.java index 41f27ed882..bf74814d71 100644 --- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetWriter.java +++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetWriter.java @@ -169,11 +169,44 @@ public ParquetWriter( boolean validating, WriterVersion writerVersion, Configuration conf) throws IOException { + this(file, ParquetFileWriter.Mode.CREATE, writeSupport, + compressionCodecName, blockSize, pageSize, dictionaryPageSize, + enableDictionary, validating, writerVersion, conf); + } + /** + * Create a new ParquetWriter. + * + * @param file the file to create + * @param mode file creation mode + * @param writeSupport the implementation to write a record to a RecordConsumer + * @param compressionCodecName the compression codec to use + * @param blockSize the block size threshold + * @param pageSize the page size threshold + * @param dictionaryPageSize the page size threshold for the dictionary pages + * @param enableDictionary to turn dictionary encoding on + * @param validating to turn on validation using the schema + * @param writerVersion version of parquetWriter from {@link ParquetProperties.WriterVersion} + * @param conf Hadoop configuration to use while accessing the filesystem + * @throws IOException + */ + public ParquetWriter( + Path file, + ParquetFileWriter.Mode mode, + WriteSupport writeSupport, + CompressionCodecName compressionCodecName, + int blockSize, + int pageSize, + int dictionaryPageSize, + boolean enableDictionary, + boolean validating, + WriterVersion writerVersion, + Configuration conf) throws IOException { WriteSupport.WriteContext writeContext = writeSupport.init(conf); MessageType schema = writeContext.getSchema(); - ParquetFileWriter fileWriter = new ParquetFileWriter(conf, schema, file); + ParquetFileWriter fileWriter = new ParquetFileWriter(conf, schema, file, + mode); fileWriter.start(); CodecFactory codecFactory = new CodecFactory(conf); diff --git a/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetFileWriter.java index 27753033ab..2a0e334741 100644 --- a/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetFileWriter.java +++ b/parquet-hadoop/src/test/java/parquet/hadoop/TestParquetFileWriter.java @@ -60,6 +60,39 @@ public class TestParquetFileWriter { private static final Log LOG = Log.getLog(TestParquetFileWriter.class); private String writeSchema; + @Test + public void testWriteMode() throws Exception { + File testDir = new File("target/test/TestParquetFileWriter/"); + testDir.mkdirs(); + File testFile = new File(testDir, "testParquetFile"); + testFile = testFile.getAbsoluteFile(); + testFile.createNewFile(); + MessageType schema = MessageTypeParser.parseMessageType( + "message m { required group a {required binary b;} required group " + + "c { required int64 d; }}"); + Configuration conf = new Configuration(); + + ParquetFileWriter writer = null; + boolean exceptionThrown = false; + Path path = new Path(testFile.toURI()); + try { + writer = new ParquetFileWriter(conf, schema, path, + ParquetFileWriter.Mode.CREATE); + } catch(IOException ioe1) { + exceptionThrown = true; + } + assertTrue(exceptionThrown); + exceptionThrown = false; + try { + writer = new ParquetFileWriter(conf, schema, path, + ParquetFileWriter.Mode.OVERWRITE); + } catch(IOException ioe2) { + exceptionThrown = true; + } + assertTrue(!exceptionThrown); + testFile.delete(); + } + @Test public void testWriteRead() throws Exception {