Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache-github/master' into SPARK-23362
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Feb 9, 2018
2 parents 3ed2a50 + 4df84c3 commit 05c9d20
Show file tree
Hide file tree
Showing 26 changed files with 135 additions and 96 deletions.
Expand Up @@ -46,9 +46,12 @@ private boolean shouldPool(long size) {

@Override
public MemoryBlock allocate(long size) throws OutOfMemoryError {
if (shouldPool(size)) {
int numWords = (int) ((size + 7) / 8);
long alignedSize = numWords * 8L;
assert (alignedSize >= size);
if (shouldPool(alignedSize)) {
synchronized (this) {
final LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(size);
final LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(alignedSize);
if (pool != null) {
while (!pool.isEmpty()) {
final WeakReference<long[]> arrayReference = pool.pop();
Expand All @@ -62,11 +65,11 @@ public MemoryBlock allocate(long size) throws OutOfMemoryError {
return memory;
}
}
bufferPoolsBySize.remove(size);
bufferPoolsBySize.remove(alignedSize);
}
}
}
long[] array = new long[(int) ((size + 7) / 8)];
long[] array = new long[numWords];
MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);
if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
Expand Down Expand Up @@ -98,12 +101,13 @@ public void free(MemoryBlock memory) {
long[] array = (long[]) memory.obj;
memory.setObjAndOffset(null, 0);

if (shouldPool(size)) {
long alignedSize = ((size + 7) / 8) * 8;
if (shouldPool(alignedSize)) {
synchronized (this) {
LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(size);
LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(alignedSize);
if (pool == null) {
pool = new LinkedList<>();
bufferPoolsBySize.put(size, pool);
bufferPoolsBySize.put(alignedSize, pool);
}
pool.add(new WeakReference<>(array));
}
Expand Down
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.unsafe;

import org.apache.spark.unsafe.memory.HeapMemoryAllocator;
import org.apache.spark.unsafe.memory.MemoryAllocator;
import org.apache.spark.unsafe.memory.MemoryBlock;

Expand Down Expand Up @@ -134,4 +135,25 @@ public void memoryDebugFillEnabledInTest() {
MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
MemoryAllocator.UNSAFE.free(offheap);
}

@Test
public void heapMemoryReuse() {
MemoryAllocator heapMem = new HeapMemoryAllocator();
// The size is less than `HeapMemoryAllocator.POOLING_THRESHOLD_BYTES`,allocate new memory every time.
MemoryBlock onheap1 = heapMem.allocate(513);
Object obj1 = onheap1.getBaseObject();
heapMem.free(onheap1);
MemoryBlock onheap2 = heapMem.allocate(514);
Assert.assertNotEquals(obj1, onheap2.getBaseObject());

// The size is greater than `HeapMemoryAllocator.POOLING_THRESHOLD_BYTES`,
// reuse the previous memory which has released.
MemoryBlock onheap3 = heapMem.allocate(1024 * 1024 + 1);
Assert.assertEquals(onheap3.size(), 1024 * 1024 + 1);
Object obj3 = onheap3.getBaseObject();
heapMem.free(onheap3);
MemoryBlock onheap4 = heapMem.allocate(1024 * 1024 + 7);
Assert.assertEquals(onheap4.size(), 1024 * 1024 + 7);
Assert.assertEquals(obj3, onheap4.getBaseObject());
}
}
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.6
Expand Up @@ -182,7 +182,7 @@ slf4j-api-1.7.16.jar
slf4j-log4j12-1.7.16.jar
snakeyaml-1.15.jar
snappy-0.2.jar
snappy-java-1.1.2.6.jar
snappy-java-1.1.7.1.jar
spire-macros_2.11-0.13.0.jar
spire_2.11-0.13.0.jar
stax-api-1.0-2.jar
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.7
Expand Up @@ -183,7 +183,7 @@ slf4j-api-1.7.16.jar
slf4j-log4j12-1.7.16.jar
snakeyaml-1.15.jar
snappy-0.2.jar
snappy-java-1.1.2.6.jar
snappy-java-1.1.7.1.jar
spire-macros_2.11-0.13.0.jar
spire_2.11-0.13.0.jar
stax-api-1.0-2.jar
Expand Down
Expand Up @@ -28,18 +28,15 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSession, SQLContext}
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.sources._
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.reader.{ContinuousReadSupport, MicroBatchReadSupport}
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader
import org.apache.spark.sql.sources.v2.writer.StreamWriteSupport
import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions, MicroBatchReadSupport, StreamWriteSupport}
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType

/**
* The provider class for the [[KafkaSource]]. This provider is designed such that it throws
* The provider class for all Kafka readers and writers. It is designed such that it throws
* IllegalArgumentException when the Kafka Dataset is created, so that it can catch
* missing options even before the query is started.
*/
Expand All @@ -56,27 +53,13 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
override def shortName(): String = "kafka"

/**
* Returns the name and schema of the source. In addition, it also verifies whether the options
* are correct and sufficient to create the [[KafkaSource]] when the query is started.
*/
def sourceSchema(
sqlContext: SQLContext,
schema: Option[StructType],
providerName: String,
parameters: Map[String, String]): (String, StructType) = {
validateStreamOptions(parameters)
require(schema.isEmpty, "Kafka source has a fixed schema and cannot be set with a custom one")
(shortName(), KafkaOffsetReader.kafkaSchema)
}

/**
* Creates a [[MicroBatchReader]] to read batches of data from this data source in a
* streaming query.
* Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader]] to read batches
* of Kafka data in a micro-batch streaming query.
*/
def createMicroBatchReader(
schema: Optional[StructType],
metadataPath: String,
options: DataSourceOptions): MicroBatchReader = {
options: DataSourceOptions): KafkaMicroBatchReader = {

val parameters = options.asMap().asScala.toMap
validateStreamOptions(parameters)
Expand Down Expand Up @@ -111,6 +94,10 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
failOnDataLoss(caseInsensitiveParams))
}

/**
* Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.ContinuousDataReader]] to read
* Kafka data in a continuous streaming query.
*/
override def createContinuousReader(
schema: Optional[StructType],
metadataPath: String,
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -160,7 +160,7 @@
<codehaus.jackson.version>1.9.13</codehaus.jackson.version>
<fasterxml.jackson.version>2.6.7</fasterxml.jackson.version>
<fasterxml.jackson.databind.version>2.6.7.1</fasterxml.jackson.databind.version>
<snappy.version>1.1.2.6</snappy.version>
<snappy.version>1.1.7.1</snappy.version>
<netlib.java.version>1.1.2</netlib.java.version>
<calcite.version>1.2.0-incubating</calcite.version>
<commons-codec.version>1.10</commons-codec.version>
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/sql/tests.py
Expand Up @@ -2860,7 +2860,7 @@ def test_create_dataframe_required_pandas_not_found(self):
with QuietTest(self.sc):
with self.assertRaisesRegexp(
ImportError,
'(Pandas >= .* must be installed|No module named pandas)'):
"(Pandas >= .* must be installed|No module named '?pandas'?)"):
import pandas as pd
from datetime import datetime
pdf = pd.DataFrame({"ts": [datetime(2017, 10, 31, 1, 1, 1)],
Expand Down
Expand Up @@ -15,13 +15,11 @@
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2.reader;
package org.apache.spark.sql.sources.v2;

import java.util.Optional;

import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader;
import org.apache.spark.sql.types.StructType;

Expand Down
Expand Up @@ -15,13 +15,11 @@
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2.reader;
package org.apache.spark.sql.sources.v2;

import java.util.Optional;

import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
import org.apache.spark.sql.types.StructType;

Expand Down
Expand Up @@ -15,12 +15,11 @@
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2.writer;
package org.apache.spark.sql.sources.v2;

import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.types.StructType;
Expand Down
Expand Up @@ -21,6 +21,7 @@
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.StreamWriteSupport;
import org.apache.spark.sql.sources.v2.WriteSupport;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.types.StructType;
Expand Down
Expand Up @@ -22,7 +22,7 @@ import java.util.Locale
import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Expression, InputFileBlockLength, InputFileBlockStart, InputFileName, RowOrdering}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, Expression, InputFileBlockLength, InputFileBlockStart, InputFileName, RowOrdering}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command.DDLUtils
Expand Down Expand Up @@ -178,7 +178,8 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi

c.copy(
tableDesc = existingTable,
query = Some(newQuery))
query = Some(DDLPreprocessingUtils.castAndRenameQueryOutput(
newQuery, existingTable.schema.toAttributes, conf)))

// Here we normalize partition, bucket and sort column names, w.r.t. the case sensitivity
// config, and do various checks:
Expand Down Expand Up @@ -316,7 +317,7 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
* table. It also does data type casting and field renaming, to make sure that the columns to be
* inserted have the correct data type and fields have the correct names.
*/
case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] with CastSupport {
case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] {
private def preprocess(
insert: InsertIntoTable,
tblName: String,
Expand All @@ -336,6 +337,8 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] wit
s"including ${staticPartCols.size} partition column(s) having constant value(s).")
}

val newQuery = DDLPreprocessingUtils.castAndRenameQueryOutput(
insert.query, expectedColumns, conf)
if (normalizedPartSpec.nonEmpty) {
if (normalizedPartSpec.size != partColNames.length) {
throw new AnalysisException(
Expand All @@ -346,37 +349,11 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] wit
""".stripMargin)
}

castAndRenameChildOutput(insert.copy(partition = normalizedPartSpec), expectedColumns)
insert.copy(query = newQuery, partition = normalizedPartSpec)
} else {
// All partition columns are dynamic because the InsertIntoTable command does
// not explicitly specify partitioning columns.
castAndRenameChildOutput(insert, expectedColumns)
.copy(partition = partColNames.map(_ -> None).toMap)
}
}

private def castAndRenameChildOutput(
insert: InsertIntoTable,
expectedOutput: Seq[Attribute]): InsertIntoTable = {
val newChildOutput = expectedOutput.zip(insert.query.output).map {
case (expected, actual) =>
if (expected.dataType.sameType(actual.dataType) &&
expected.name == actual.name &&
expected.metadata == actual.metadata) {
actual
} else {
// Renaming is needed for handling the following cases like
// 1) Column names/types do not match, e.g., INSERT INTO TABLE tab1 SELECT 1, 2
// 2) Target tables have column metadata
Alias(cast(actual, expected.dataType), expected.name)(
explicitMetadata = Option(expected.metadata))
}
}

if (newChildOutput == insert.query.output) {
insert
} else {
insert.copy(query = Project(newChildOutput, insert.query))
insert.copy(query = newQuery, partition = partColNames.map(_ -> None).toMap)
}
}

Expand Down Expand Up @@ -491,3 +468,36 @@ object PreWriteCheck extends (LogicalPlan => Unit) {
}
}
}

object DDLPreprocessingUtils {

/**
* Adjusts the name and data type of the input query output columns, to match the expectation.
*/
def castAndRenameQueryOutput(
query: LogicalPlan,
expectedOutput: Seq[Attribute],
conf: SQLConf): LogicalPlan = {
val newChildOutput = expectedOutput.zip(query.output).map {
case (expected, actual) =>
if (expected.dataType.sameType(actual.dataType) &&
expected.name == actual.name &&
expected.metadata == actual.metadata) {
actual
} else {
// Renaming is needed for handling the following cases like
// 1) Column names/types do not match, e.g., INSERT INTO TABLE tab1 SELECT 1, 2
// 2) Target tables have column metadata
Alias(
Cast(actual, expected.dataType, Option(conf.sessionLocalTimeZone)),
expected.name)(explicitMetadata = Option(expected.metadata))
}
}

if (newChildOutput == query.output) {
query
} else {
Project(newChildOutput, query)
}
}
}
Expand Up @@ -29,10 +29,9 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, WriteToDataSourceV2}
import org.apache.spark.sql.execution.streaming.sources.{InternalRowMicroBatchWriter, MicroBatchWriter}
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.reader.MicroBatchReadSupport
import org.apache.spark.sql.sources.v2.{DataSourceOptions, MicroBatchReadSupport, StreamWriteSupport}
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset => OffsetV2}
import org.apache.spark.sql.sources.v2.writer.{StreamWriteSupport, SupportsWriteInternalRow}
import org.apache.spark.sql.sources.v2.writer.SupportsWriteInternalRow
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
import org.apache.spark.util.{Clock, Utils}

Expand Down
Expand Up @@ -32,7 +32,6 @@ import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider}
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.reader.ContinuousReadSupport
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader
import org.apache.spark.sql.types._
import org.apache.spark.util.{ManualClock, SystemClock}
Expand Down
Expand Up @@ -25,8 +25,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LeafNode
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.execution.LeafExecNode
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.sources.v2.DataSourceV2
import org.apache.spark.sql.sources.v2.reader.ContinuousReadSupport
import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceV2}

object StreamingRelation {
def apply(dataSource: DataSource): StreamingRelation = {
Expand Down
Expand Up @@ -20,8 +20,7 @@ package org.apache.spark.sql.execution.streaming
import org.apache.spark.sql._
import org.apache.spark.sql.execution.streaming.sources.ConsoleWriter
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister}
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2}
import org.apache.spark.sql.sources.v2.writer.StreamWriteSupport
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, StreamWriteSupport}
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
Expand Down
Expand Up @@ -31,10 +31,8 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, StreamingDataSourceV2Relation, WriteToDataSourceV2}
import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _}
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.reader.ContinuousReadSupport
import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions, StreamWriteSupport}
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, PartitionOffset}
import org.apache.spark.sql.sources.v2.writer.StreamWriteSupport
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.{Clock, Utils}
Expand Down

0 comments on commit 05c9d20

Please sign in to comment.