Implementation of partitioning stream writer#52
Conversation
| } | ||
|
|
||
| def write(dataFrame: DataFrame, offsetManager: OffsetManager): StreamingQuery = { | ||
| if (dataFrame == null) { |
There was a problem hiding this comment.
In general, I have seen a lot of usage of null. How about we isolate this check if x:Any == null somewhere else, so the "not nice" null usage in scala is only there?
There was a problem hiding this comment.
I agree. We could also use Predef.require for these null checks because most of the null checks are actually preconditions for the function.
I have created #53
|
|
||
| import scala.util.{Failure, Success, Try} | ||
|
|
||
| private[writer] abstract class AbstractParquetStreamWriter(destination: String, val extraConfOptions: Option[Map[String, String]]) extends StreamWriter(destination) { |
There was a problem hiding this comment.
Wouldn't an Empty Map be easier to handle?
There was a problem hiding this comment.
Could be. The "option" here is more of a semantic markup to indicate that this options are not mandatory.
|
|
||
| private def parseConf(option: String): (String, String) = { | ||
| val keyValue = option.split("=") | ||
| if (keyValue.length == 2) { |
There was a problem hiding this comment.
I think general java conf files are allowed to have more than one = but only first acts as the delimiter.
There was a problem hiding this comment.
We provide the following config property: writer.parquet.extra.conf.1=key1=value1
That's why we check for the second =
We could change it to writer.parquet.extra.conf.key1=value1
However I would not change it in this PR, because AbstractParquetStreamWriter was just copied from ParquetStreamWriter
| private val COL_VERSION = "hyperdrive_version" | ||
|
|
||
| if (StringUtils.isBlank(destination)) { | ||
| throw new IllegalArgumentException(s"Invalid PARQUET destination: '$destination'") |
There was a problem hiding this comment.
AbstractParquetStreamWriter has a same check. Is it not enough?
|
|
||
| behavior of "ParquetStreamWriter" | ||
|
|
||
| it should "write partitioned by date and version=1 where destination is empty" in { |
There was a problem hiding this comment.
You mean destination dir is empty?
|
|
||
| import scala.util.{Failure, Success, Try} | ||
|
|
||
| private[writer] abstract class AbstractParquetStreamWriter(destination: String, val extraConfOptions: Option[Map[String, String]]) extends StreamWriter(destination) { |
There was a problem hiding this comment.
Could be. The "option" here is more of a semantic markup to indicate that this options are not mandatory.
| import org.apache.log4j.{Level, Logger} | ||
| import org.apache.spark.sql.SparkSession | ||
|
|
||
| trait SparkTestBase { |
| */ | ||
|
|
||
| package za.co.absa.hyperdrive.ingestor.implementation.writer.parquet | ||
|
|
There was a problem hiding this comment.
Unused imports:
import org.apache.logging.log4j.LogManager
StreamWriterFactory
| import java.time.format.DateTimeFormatter | ||
|
|
||
| import org.apache.commons.configuration2.Configuration | ||
| import org.apache.commons.lang3.StringUtils |
Added new component:
component.writer=za.co.absa.hyperdrive.ingestor.implementation.writer.parquet.ParquetPartitioningStreamWriterRequired configuration
writer.parquet.destination.directory(same like for ParquetStreamWriter)Optional configuration
writer.parquet.partitioning.report.date=yyyy-MM-ddFor debugging or failure recovery purposes, this parameter determines "hyperdrive_date", i.e. into which partition the data is written. Default: Current date