Skip to content

Commit

Permalink
Using options of S3FileSystem in ParquetSink to pass s3-credentials t…
Browse files Browse the repository at this point in the history
…o ParquetWriter #373
  • Loading branch information
Florian Witteler committed Feb 28, 2018
1 parent 7cd7d3f commit 598421c
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ case class ParquetSink(path: Path, options: ParquetWriteOptions = ParquetWriteOp
if (options.overwrite && fs.exists(path))
fs.delete(path, false)

val writer = RowParquetWriterFn(path, schema, options.metadata, options.dictionary, options.roundingMode)
val writer = RowParquetWriterFn(path, schema, options.metadata, options.dictionary, options.roundingMode, fs.getConf)

override def write(row: Row): Unit = {
writer.write(row)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ object RowParquetWriterFn {
schema: StructType,
metadata: Map[String, String],
dictionary: Boolean,
roundingMode: RoundingMode): ParquetWriter[Row] = {
roundingMode: RoundingMode,
fsConfig: Configuration): ParquetWriter[Row] = {
val config = ParquetWriterConfig()
val messageType = ParquetSchemaFns.toParquetMessageType(schema)
new RowParquetWriterBuilder(path, messageType, roundingMode, metadata)
Expand All @@ -46,6 +47,7 @@ object RowParquetWriterFn {
.withValidation(config.validating)
.withWriteMode(ParquetFileWriter.Mode.CREATE)
.withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
.withConf(fsConfig)
.build()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class DecimalWriterTest extends FunSuite {
if (fileSystem.exists(path)) fileSystem.delete(path, false)

// Write out the decimal values
val parquetWriter = RowParquetWriterFn(path = path, schema = schema, metadata = Map.empty, dictionary = false, roundingMode = RoundingMode.UP)
val parquetWriter = RowParquetWriterFn(path = path, schema = schema, metadata = Map.empty, dictionary = false, roundingMode = RoundingMode.UP, fileSystem.getConf)
expectedBigDecimals.foreach { expectedBigDecimal =>
println(s"Writing row with value $expectedBigDecimal")
parquetWriter.write(Row.fromMap(schema, Map("bd" -> expectedBigDecimal)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ case class ParquetHiveDialect(options: ParquetWriteOptions = ParquetWriteOptions

private val _records = new AtomicInteger(0)
logger.debug(s"Creating parquet writer at $path")
private val writer = RowParquetWriterFn(path, schema, metadata, true, roundingMode)
private val writer = RowParquetWriterFn(path, schema, metadata, true, roundingMode, fs.getConf)

override def write(row: Row) {
require(row.values.nonEmpty, "Attempting to write an empty row")
Expand Down

0 comments on commit 598421c

Please sign in to comment.