Skip to content

Commit

Permalink
[SPARK-27106][SQL] merge CaseInsensitiveStringMap and DataSourceOptions
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

It's a little awkward to have 2 different classes(`CaseInsensitiveStringMap` and `DataSourceOptions`) to present the options in data source and catalog API.

This PR merges these 2 classes, while keeping the name `CaseInsensitiveStringMap`, which is more precise.

## How was this patch tested?

existing tests

Closes #24025 from cloud-fan/option.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
cloud-fan committed Mar 13, 2019
1 parent 812ad55 commit 2a80a4c
Show file tree
Hide file tree
Showing 63 changed files with 363 additions and 558 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ import org.apache.spark.sql.sources.v2.reader.streaming._
* @param offsetReader a reader used to get kafka offsets. Note that the actual data will be
* read by per-task consumers generated later.
* @param kafkaParams String params for per-task Kafka consumers.
* @param sourceOptions The [[org.apache.spark.sql.sources.v2.DataSourceOptions]] params which
* are not Kafka consumer params.
* @param sourceOptions Params which are not Kafka consumer params.
* @param metadataPath Path to a directory this reader can use for writing metadata.
* @param initialOffsets The Kafka offsets to start reading data at.
* @param failOnDataLoss Flag indicating whether reading should fail in data loss
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset}
import org.apache.spark.sql.execution.streaming.sources.RateControlMicroBatchStream
import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, Offset}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.UninterruptibleThread

/**
Expand All @@ -57,7 +57,7 @@ import org.apache.spark.util.UninterruptibleThread
private[kafka010] class KafkaMicroBatchStream(
kafkaOffsetReader: KafkaOffsetReader,
executorKafkaParams: ju.Map[String, Object],
options: DataSourceOptions,
options: CaseInsensitiveStringMap,
metadataPath: String,
startingOffsets: KafkaOffsetRangeLimit,
failOnDataLoss: Boolean) extends RateControlMicroBatchStream with Logging {
Expand All @@ -66,8 +66,7 @@ private[kafka010] class KafkaMicroBatchStream(
"kafkaConsumer.pollTimeoutMs",
SparkEnv.get.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L)

private val maxOffsetsPerTrigger =
Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
private val maxOffsetsPerTrigger = Option(options.get("maxOffsetsPerTrigger")).map(_.toLong)

private val rangeCalculator = KafkaOffsetRangeCalculator(options)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.kafka010

import org.apache.kafka.common.TopicPartition

import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.util.CaseInsensitiveStringMap


/**
Expand Down Expand Up @@ -91,8 +91,8 @@ private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Option[Int

private[kafka010] object KafkaOffsetRangeCalculator {

def apply(options: DataSourceOptions): KafkaOffsetRangeCalculator = {
val optionalValue = Option(options.get("minPartitions").orElse(null)).map(_.toInt)
def apply(options: CaseInsensitiveStringMap): KafkaOffsetRangeCalculator = {
val optionalValue = Option(options.get("minPartitions")).map(_.toInt)
new KafkaOffsetRangeCalculator(optionalValue)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.apache.spark.sql.sources.v2.writer.WriteBuilder
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

/**
* The provider class for all Kafka readers and writers. It is designed such that it throws
Expand Down Expand Up @@ -103,8 +104,8 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
failOnDataLoss(caseInsensitiveParams))
}

override def getTable(options: DataSourceOptions): KafkaTable = {
new KafkaTable(strategy(options.asMap().asScala.toMap))
override def getTable(options: CaseInsensitiveStringMap): KafkaTable = {
new KafkaTable(strategy(options.asScala.toMap))
}

/**
Expand Down Expand Up @@ -358,11 +359,11 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister

override def schema(): StructType = KafkaOffsetReader.kafkaSchema

override def newScanBuilder(options: DataSourceOptions): ScanBuilder = new ScanBuilder {
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = new ScanBuilder {
override def build(): Scan = new KafkaScan(options)
}

override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = {
override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = {
new WriteBuilder {
private var inputSchema: StructType = _

Expand All @@ -375,20 +376,20 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
import scala.collection.JavaConverters._

assert(inputSchema != null)
val topic = Option(options.get(TOPIC_OPTION_KEY).orElse(null)).map(_.trim)
val producerParams = kafkaParamsForProducer(options.asMap.asScala.toMap)
val topic = Option(options.get(TOPIC_OPTION_KEY)).map(_.trim)
val producerParams = kafkaParamsForProducer(options.asScala.toMap)
new KafkaStreamingWrite(topic, producerParams, inputSchema)
}
}
}
}

class KafkaScan(options: DataSourceOptions) extends Scan {
class KafkaScan(options: CaseInsensitiveStringMap) extends Scan {

override def readSchema(): StructType = KafkaOffsetReader.kafkaSchema

override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = {
val parameters = options.asMap().asScala.toMap
val parameters = options.asScala.toMap
validateStreamOptions(parameters)
// Each running query should use its own group id. Otherwise, the query may be only assigned
// partial data since Kafka will assign partitions to multiple consumers having the same group
Expand Down Expand Up @@ -417,7 +418,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
}

override def toContinuousStream(checkpointLocation: String): ContinuousStream = {
val parameters = options.asMap().asScala.toMap
val parameters = options.asScala.toMap
validateStreamOptions(parameters)
// Each running query should use its own group id. Otherwise, the query may be only assigned
// partial data since Kafka will assign partitions to multiple consumers having the same group
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
import org.apache.spark.sql.functions.{count, window}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.streaming.{StreamTest, Trigger}
import org.apache.spark.sql.streaming.util.StreamManualClock
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.util.CaseInsensitiveStringMap

abstract class KafkaSourceTest extends StreamTest with SharedSQLContext with KafkaTest {

Expand Down Expand Up @@ -1118,7 +1118,7 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
"kafka.bootstrap.servers" -> testUtils.brokerAddress,
"subscribe" -> topic
) ++ Option(minPartitions).map { p => "minPartitions" -> p}
val dsOptions = new DataSourceOptions(options.asJava)
val dsOptions = new CaseInsensitiveStringMap(options.asJava)
val table = provider.getTable(dsOptions)
val stream = table.newScanBuilder(dsOptions).build().toMicroBatchStream(dir.getAbsolutePath)
val inputPartitions = stream.planInputPartitions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,21 @@ import scala.collection.JavaConverters._
import org.apache.kafka.common.TopicPartition

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.util.CaseInsensitiveStringMap

class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite {

def testWithMinPartitions(name: String, minPartition: Int)
(f: KafkaOffsetRangeCalculator => Unit): Unit = {
val options = new DataSourceOptions(Map("minPartitions" -> minPartition.toString).asJava)
val options = new CaseInsensitiveStringMap(Map("minPartitions" -> minPartition.toString).asJava)
test(s"with minPartition = $minPartition: $name") {
f(KafkaOffsetRangeCalculator(options))
}
}


test("with no minPartition: N TopicPartitions to N offset ranges") {
val calc = KafkaOffsetRangeCalculator(DataSourceOptions.empty())
val calc = KafkaOffsetRangeCalculator(CaseInsensitiveStringMap.empty())
assert(
calc.getRanges(
fromOffsets = Map(tp1 -> 1),
Expand Down Expand Up @@ -64,7 +64,7 @@ class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite {
}

test("with no minPartition: empty ranges ignored") {
val calc = KafkaOffsetRangeCalculator(DataSourceOptions.empty())
val calc = KafkaOffsetRangeCalculator(CaseInsensitiveStringMap.empty())
assert(
calc.getRanges(
fromOffsets = Map(tp1 -> 1, tp2 -> 1),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,20 @@
* This is used to pass options to v2 implementations to ensure consistent case insensitivity.
* <p>
* Methods that return keys in this map, like {@link #entrySet()} and {@link #keySet()}, return
* keys converted to lower case.
* keys converted to lower case. This map doesn't allow null key.
*/
@Experimental
public class CaseInsensitiveStringMap implements Map<String, String> {

public static CaseInsensitiveStringMap empty() {
return new CaseInsensitiveStringMap();
return new CaseInsensitiveStringMap(new HashMap<>(0));
}

private final Map<String, String> delegate;

private CaseInsensitiveStringMap() {
this.delegate = new HashMap<>();
public CaseInsensitiveStringMap(Map<String, String> originalMap) {
this.delegate = new HashMap<>(originalMap.size());
putAll(originalMap);
}

@Override
Expand All @@ -56,9 +57,13 @@ public boolean isEmpty() {
return delegate.isEmpty();
}

private String toLowerCase(Object key) {
return key.toString().toLowerCase(Locale.ROOT);
}

@Override
public boolean containsKey(Object key) {
return delegate.containsKey(key.toString().toLowerCase(Locale.ROOT));
return delegate.containsKey(toLowerCase(key));
}

@Override
Expand All @@ -68,17 +73,17 @@ public boolean containsValue(Object value) {

@Override
public String get(Object key) {
return delegate.get(key.toString().toLowerCase(Locale.ROOT));
return delegate.get(toLowerCase(key));
}

@Override
public String put(String key, String value) {
return delegate.put(key.toLowerCase(Locale.ROOT), value);
return delegate.put(toLowerCase(key), value);
}

@Override
public String remove(Object key) {
return delegate.remove(key.toString().toLowerCase(Locale.ROOT));
return delegate.remove(toLowerCase(key));
}

@Override
Expand Down Expand Up @@ -107,4 +112,49 @@ public Collection<String> values() {
public Set<Map.Entry<String, String>> entrySet() {
return delegate.entrySet();
}

/**
* Returns the boolean value to which the specified key is mapped,
* or defaultValue if there is no mapping for the key. The key match is case-insensitive.
*/
public boolean getBoolean(String key, boolean defaultValue) {
String value = get(key);
// We can't use `Boolean.parseBoolean` here, as it returns false for invalid strings.
if (value == null) {
return defaultValue;
} else if (value.equalsIgnoreCase("true")) {
return true;
} else if (value.equalsIgnoreCase("false")) {
return false;
} else {
throw new IllegalArgumentException(value + " is not a boolean string.");
}
}

/**
* Returns the integer value to which the specified key is mapped,
* or defaultValue if there is no mapping for the key. The key match is case-insensitive.
*/
public int getInt(String key, int defaultValue) {
String value = get(key);
return value == null ? defaultValue : Integer.parseInt(value);
}

/**
* Returns the long value to which the specified key is mapped,
* or defaultValue if there is no mapping for the key. The key match is case-insensitive.
*/
public long getLong(String key, long defaultValue) {
String value = get(key);
return value == null ? defaultValue : Long.parseLong(value);
}

/**
* Returns the double value to which the specified key is mapped,
* or defaultValue if there is no mapping for the key. The key match is case-insensitive.
*/
public double getDouble(String key, double defaultValue) {
String value = get(key);
return value == null ? defaultValue : Double.parseDouble(value);
}
}

This file was deleted.

Loading

0 comments on commit 2a80a4c

Please sign in to comment.