From d12f8e041d0cd497db3f80e613ec1bafd5da734c Mon Sep 17 00:00:00 2001 From: Jacek Lewandowski Date: Thu, 10 Sep 2015 15:46:01 +0200 Subject: [PATCH] SPARKC-241: Initial integration with Spark 1.5.0 --- CHANGES.txt | 1 + project/CassandraSparkBuild.scala | 3 +- project/Versions.scala | 2 +- .../demo/KafkaStreamingWordCountApp.scala | 13 ++-- .../spark/connector/embedded/SparkRepl.scala | 3 +- .../embedded/EmbeddedCassandra.scala | 65 +++++++++++++------ .../spark/connector/types/TypeConverter.scala | 1 + .../spark/connector/writer/SqlRowWriter.scala | 2 +- .../sql/cassandra/CassandraCatalog.scala | 20 ++---- .../sql/cassandra/CassandraSQLContext.scala | 20 +++--- .../spark/sql/cassandra/CassandraSQLRow.scala | 14 ++-- .../sql/cassandra/DataTypeConverter.scala | 4 +- 12 files changed, 83 insertions(+), 65 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 88e6a7943..5cafdd089 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 1.5.0 M1 * Added ability to build against unreleased Spark versions (SPARKC-242) + * Spark 1.5 initial integration (SPARKC-241) ******************************************************************************** diff --git a/project/CassandraSparkBuild.scala b/project/CassandraSparkBuild.scala index 5cf74dd3a..8c9918b08 100644 --- a/project/CassandraSparkBuild.scala +++ b/project/CassandraSparkBuild.scala @@ -145,6 +145,7 @@ object Dependencies { /* To allow spark artifact inclusion in the demos at runtime, we set 'provided' below. */ val sparkCore = "org.apache.spark" %% "spark-core" % Spark guavaExclude // ApacheV2 + val sparkUnsafe = "org.apache.spark" %% "spark-unsafe" % Spark guavaExclude // ApacheV2 val sparkStreaming = "org.apache.spark" %% "spark-streaming" % Spark guavaExclude // ApacheV2 val sparkSql = "org.apache.spark" %% "spark-sql" % Spark sparkExclusions // ApacheV2 val sparkCatalyst = "org.apache.spark" %% "spark-catalyst" % Spark sparkExclusions // ApacheV2 @@ -215,7 +216,7 @@ object Dependencies { val cassandra = Seq(cassandraClient, cassandraDriver) - val spark = Seq(sparkCore, sparkStreaming, sparkSql, sparkCatalyst, sparkHive) + val spark = Seq(sparkCore, sparkStreaming, sparkSql, sparkCatalyst, sparkHive, sparkUnsafe) val connector = testKit ++ metrics ++ jetty ++ logging ++ akka ++ cassandra ++ spark.map(_ % "provided") ++ Seq( commonsLang3, config, guava, jodaC, jodaT, lzf, jsr166e) diff --git a/project/Versions.scala b/project/Versions.scala index 8771c0a49..e8d65841f 100644 --- a/project/Versions.scala +++ b/project/Versions.scala @@ -56,7 +56,7 @@ object Versions { // and install in a local Maven repository. This is all done automatically, however it will work // only on Unix/OSX operating system. Windows users have to build and install Spark manually if the // desired version is not yet published into a public Maven repository. - val Spark = "1.4.0" + val Spark = "1.5.0" val SparkJetty = "8.1.14.v20131031" val JSR166e = "1.1.0" diff --git a/spark-cassandra-connector-demos/kafka-streaming/src/main/scala/com/datastax/spark/connector/demo/KafkaStreamingWordCountApp.scala b/spark-cassandra-connector-demos/kafka-streaming/src/main/scala/com/datastax/spark/connector/demo/KafkaStreamingWordCountApp.scala index cf89a5894..c587023fa 100644 --- a/spark-cassandra-connector-demos/kafka-streaming/src/main/scala/com/datastax/spark/connector/demo/KafkaStreamingWordCountApp.scala +++ b/spark-cassandra-connector-demos/kafka-streaming/src/main/scala/com/datastax/spark/connector/demo/KafkaStreamingWordCountApp.scala @@ -1,19 +1,14 @@ package com.datastax.spark.connector.demo -import com.datastax.spark.connector.embedded.Event.WordCount - -import scala.sys.process._ -import scala.util.Try import kafka.serializer.StringDecoder -import org.apache.spark.{Logging, SparkContext, SparkConf} import org.apache.spark.storage.StorageLevel -import org.apache.spark.SparkContext._ import org.apache.spark.streaming._ -import org.apache.spark.streaming.StreamingContext._ -import org.apache.spark.streaming.kafka._ +import org.apache.spark.streaming.kafka.KafkaUtils +import org.apache.spark.{Logging, SparkConf, SparkContext} + import com.datastax.spark.connector.cql.CassandraConnector +import com.datastax.spark.connector.embedded.Event.WordCount import com.datastax.spark.connector.embedded._ -import com.datastax.spark.connector._ import com.datastax.spark.connector.streaming._ /** diff --git a/spark-cassandra-connector-embedded/scala-2.11/src/main/scala/com/datastax/spark/connector/embedded/SparkRepl.scala b/spark-cassandra-connector-embedded/scala-2.11/src/main/scala/com/datastax/spark/connector/embedded/SparkRepl.scala index bb07706a1..0370c62f6 100644 --- a/spark-cassandra-connector-embedded/scala-2.11/src/main/scala/com/datastax/spark/connector/embedded/SparkRepl.scala +++ b/spark-cassandra-connector-embedded/scala-2.11/src/main/scala/com/datastax/spark/connector/embedded/SparkRepl.scala @@ -4,7 +4,8 @@ import java.io.{PrintWriter, StringWriter, StringReader, BufferedReader} import java.net.URLClassLoader import scala.collection.mutable.ArrayBuffer -import scala.tools.nsc.interpreter.SparkILoop + +import org.apache.spark.repl.SparkILoop trait SparkRepl { diff --git a/spark-cassandra-connector-embedded/src/main/scala/com/datastax/spark/connector/embedded/EmbeddedCassandra.scala b/spark-cassandra-connector-embedded/src/main/scala/com/datastax/spark/connector/embedded/EmbeddedCassandra.scala index 887765dfc..50d178ecb 100644 --- a/spark-cassandra-connector-embedded/src/main/scala/com/datastax/spark/connector/embedded/EmbeddedCassandra.scala +++ b/spark-cassandra-connector-embedded/src/main/scala/com/datastax/spark/connector/embedded/EmbeddedCassandra.scala @@ -4,11 +4,12 @@ import java.net.InetAddress import org.apache.commons.configuration.ConfigurationException +import scala.collection.JavaConversions._ /** A utility trait for integration testing. * Manages *one* single Cassandra server at a time and enables switching its configuration. * This is not thread safe, and test suites must not be run in parallel, - * because they will "steal" the server.*/ + * because they will "steal" the server. */ trait EmbeddedCassandra { /** Implementation hook. */ @@ -19,7 +20,7 @@ trait EmbeddedCassandra { * the state (including data) of the previously running cassandra cluster is lost. * @param configTemplates name of the cassandra.yaml template resources * @param forceReload if set to true, the server will be reloaded fresh - * even if the configuration didn't change */ + * even if the configuration didn't change */ def useCassandraConfig(configTemplates: Seq[String], forceReload: Boolean = false) { import EmbeddedCassandra._ import UserDefinedProperty._ @@ -49,32 +50,40 @@ object UserDefinedProperty { trait TypedProperty { type ValueType + def convertValueFromString(str: String): ValueType + def checkValueType(obj: Any): ValueType } trait IntProperty extends TypedProperty { type ValueType = Int + def convertValueFromString(str: String) = str.toInt + def checkValueType(obj: Any) = obj match { case x: Int => x - case _ => throw new ClassCastException (s"Expected Int but found ${obj.getClass.getName}") + case _ => throw new ClassCastException(s"Expected Int but found ${obj.getClass.getName}") } } trait InetAddressProperty extends TypedProperty { type ValueType = InetAddress + def convertValueFromString(str: String) = InetAddress.getByName(str) + def checkValueType(obj: Any) = obj match { case x: InetAddress => x - case _ => throw new ClassCastException (s"Expected InetAddress but found ${obj.getClass.getName}") + case _ => throw new ClassCastException(s"Expected InetAddress but found ${obj.getClass.getName}") } } abstract sealed class NodeProperty(val propertyName: String) extends TypedProperty + case object HostProperty extends NodeProperty("IT_TEST_CASSANDRA_HOSTS") with InetAddressProperty + case object PortProperty extends NodeProperty("IT_TEST_CASSANDRA_PORTS") with IntProperty private def getValueSeq(propertyName: String): Seq[String] = { @@ -92,7 +101,7 @@ object UserDefinedProperty { def getProperty(nodeProperty: NodeProperty): Option[String] = sys.env.get(nodeProperty.propertyName) - + def getPropertyOrThrowIfNotFound(nodeProperty: NodeProperty): String = getProperty(nodeProperty).getOrElse( throw new ConfigurationException(s"Missing ${nodeProperty.propertyName} in system environment")) @@ -101,10 +110,10 @@ object UserDefinedProperty { object EmbeddedCassandra { import UserDefinedProperty._ - - private def countCommaSeparatedItemsIn(s: String): Int = + + private def countCommaSeparatedItemsIn(s: String): Int = s.count(_ == ',') - + getProperty(HostProperty) match { case None => case Some(hostsStr) => @@ -124,23 +133,27 @@ object EmbeddedCassandra { require(hosts.isEmpty || index < hosts.length, s"$index index is overflow the size of ${hosts.length}") val host = getHost(index).getHostAddress Map( - "seeds" -> host, - "storage_port" -> getStoragePort(index).toString, - "ssl_storage_port" -> getSslStoragePort(index).toString, + "seeds" -> host, + "storage_port" -> getStoragePort(index).toString, + "ssl_storage_port" -> getSslStoragePort(index).toString, "native_transport_port" -> getPort(index).toString, - "jmx_port" -> getJmxPort(index).toString, - "rpc_address" -> host, - "listen_address" -> host, - "cluster_name" -> getClusterName(index), - "keystore_path" -> ClassLoader.getSystemResource("keystore").getPath) + "jmx_port" -> getJmxPort(index).toString, + "rpc_address" -> host, + "listen_address" -> host, + "cluster_name" -> getClusterName(index), + "keystore_path" -> ClassLoader.getSystemResource("keystore").getPath) } def getStoragePort(index: Integer) = 7000 + index + def getSslStoragePort(index: Integer) = 7100 + index + def getJmxPort(index: Integer) = CassandraRunner.DefaultJmxPort + index + def getClusterName(index: Integer) = s"Test Cluster$index" def getHost(index: Integer): InetAddress = getNodeProperty(index, HostProperty) + def getPort(index: Integer) = getNodeProperty(index, PortProperty) private def getNodeProperty(index: Integer, nodeProperty: NodeProperty): nodeProperty.ValueType = { @@ -153,7 +166,7 @@ object EmbeddedCassandra { case _ => throw new RuntimeException(s"$index index is overflow the size of ${hosts.size}") } } - } + } Runtime.getRuntime.addShutdownHook(new Thread(new Runnable { override def run() = cassandraRunners.flatten.foreach(_.destroy()) @@ -164,9 +177,10 @@ private[connector] class CassandraRunner(val configTemplate: String, props: Map[ extends Embedded { import java.io.{File, FileOutputStream, IOException} - import org.apache.cassandra.io.util.FileUtils - import com.google.common.io.Files + import CassandraRunner._ + import com.google.common.io.Files + import org.apache.cassandra.io.util.FileUtils val tempDir = mkdir(new File(Files.createTempDir(), "cassandra-driver-spark")) val workDir = mkdir(new File(tempDir, "cassandra")) @@ -183,7 +197,16 @@ private[connector] class CassandraRunner(val configTemplate: String, props: Map[ } } - private val classPath = System.getProperty("java.class.path") + private val classPath = sys.env.get("IT_CASSANDRA_PATH") match { + case Some(customCassandraDir) => + val entries = (for (f <- Files.fileTreeTraverser().breadthFirstTraversal(new File(customCassandraDir, "lib")).toIterator + if f.isDirectory || f.getName.endsWith(".jar")) yield { + f.getAbsolutePath + }).toList ::: new File(customCassandraDir, "conf") :: Nil + entries.mkString(File.pathSeparator) + case _ => System.getProperty("java.class.path") + } + private val javaBin = System.getProperty("java.home") + "/bin/java" private val cassandraConfProperty = "-Dcassandra.config=file:" + confFile.toString private val superuserSetupDelayProperty = "-Dcassandra.superuser_setup_delay_ms=0" @@ -204,7 +227,7 @@ private[connector] class CassandraRunner(val configTemplate: String, props: Map[ .inheritIO() .start() - val nativePort = props.get("native_transport_port").get.toInt + val nativePort = props.get("native_transport_port").get.toInt if (!waitForPortOpen(InetAddress.getByName(props.get("rpc_address").get), nativePort, 100000)) throw new IOException("Failed to start Cassandra.") diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/types/TypeConverter.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/types/TypeConverter.scala index 98b1b2cc7..a0f419239 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/types/TypeConverter.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/types/TypeConverter.scala @@ -357,6 +357,7 @@ object TypeConverter { case x: java.lang.Integer => new java.math.BigInteger(x.toString) case x: java.lang.Long => new java.math.BigInteger(x.toString) case x: String => new java.math.BigInteger(x) + case x: java.math.BigDecimal if x.scale() <= 0 => x.toBigInteger } } diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/SqlRowWriter.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/SqlRowWriter.scala index ded3a7c4f..bd6bd87bb 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/SqlRowWriter.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/SqlRowWriter.scala @@ -2,7 +2,7 @@ package com.datastax.spark.connector.writer import com.datastax.spark.connector.ColumnRef import com.datastax.spark.connector.cql.TableDef -import org.apache.spark.sql.catalyst.expressions.Row +import org.apache.spark.sql.Row /** A [[RowWriter]] that can write SparkSQL `Row` objects. */ class SqlRowWriter(val table: TableDef, val selectedColumns: IndexedSeq[ColumnRef]) diff --git a/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraCatalog.scala b/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraCatalog.scala index c7260c124..10392ea34 100644 --- a/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraCatalog.scala +++ b/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraCatalog.scala @@ -2,23 +2,17 @@ package org.apache.spark.sql.cassandra import java.io.IOException -import org.apache.spark.sql.catalyst.{SimpleCatalystConf, CatalystConf} - -import com.datastax.spark.connector.cql.{CassandraConnector, Schema} -import com.google.common.cache.{CacheBuilder, CacheLoader} - import scala.collection.JavaConversions._ -import com.google.common.cache.{LoadingCache, CacheBuilder, CacheLoader} +import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} import org.apache.spark.Logging - +import org.apache.spark.sql.cassandra.CassandraSourceRelation._ import org.apache.spark.sql.catalyst.analysis.Catalog import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Subquery} -import org.apache.spark.sql.sources.LogicalRelation - -import com.datastax.spark.connector.cql.{CassandraConnectorConf, CassandraConnector, Schema} +import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf, TableIdentifier} +import org.apache.spark.sql.execution.datasources.LogicalRelation -import CassandraSourceRelation._ +import com.datastax.spark.connector.cql.{CassandraConnector, CassandraConnectorConf, Schema} private[cassandra] class CassandraCatalog(csc: CassandraSQLContext) extends Catalog with Logging { @@ -136,7 +130,7 @@ private[cassandra] class CassandraCatalog(csc: CassandraSQLContext) extends Cata override val conf: CatalystConf = SimpleCatalystConf(caseSensitive) - def refreshTable(databaseName: String, tableName: String): Unit = { - cachedDataSourceTables.refresh(Seq(csc.getCluster, databaseName, tableName)) + override def refreshTable(tableIdent: TableIdentifier): Unit = { + cachedDataSourceTables.refresh(tableIdent.toSeq) } } diff --git a/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraSQLContext.scala b/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraSQLContext.scala index 5476c5a8f..ade516ddf 100644 --- a/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraSQLContext.scala +++ b/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraSQLContext.scala @@ -2,15 +2,11 @@ package org.apache.spark.sql.cassandra import java.util.NoSuchElementException -import org.apache.commons.lang.StringUtils -import org.apache.spark.sql.sources.DataSourceStrategy -import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.sql.catalyst.analysis.OverrideCatalog +import org.apache.spark.SparkContext +import org.apache.spark.sql.cassandra.CassandraSourceRelation._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.{DataFrame, Strategy, SQLContext} - -import CassandraSQLContext._ -import CassandraSourceRelation._ +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.{DataFrame, SQLContext, Strategy} /** Allows to execute SQL queries against Cassandra and access results as * `SchemaRDD` collections. Predicate pushdown to Cassandra is supported. @@ -36,7 +32,7 @@ import CassandraSourceRelation._ * * }}} */ class CassandraSQLContext(sc: SparkContext) extends SQLContext(sc) { - import CassandraSQLContext._ + import org.apache.spark.sql.cassandra.CassandraSQLContext._ override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution = new this.QueryExecution(plan) @@ -87,12 +83,12 @@ class CassandraSQLContext(sc: SparkContext) extends SQLContext(sc) { override val strategies: Seq[Strategy] = Seq( DataSourceStrategy, DDLStrategy, - TakeOrdered, - ParquetOperations, + TakeOrderedAndProject, InMemoryScans, HashAggregation, + Aggregation, LeftSemiJoin, - HashJoin, + EquiJoinSelection, BasicOperators, CartesianProduct, BroadcastNestedLoopJoin diff --git a/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraSQLRow.scala b/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraSQLRow.scala index 727482812..13783a462 100644 --- a/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraSQLRow.scala +++ b/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraSQLRow.scala @@ -1,15 +1,16 @@ package org.apache.spark.sql.cassandra +import java.math.BigInteger import java.sql.Timestamp import java.util.Date -import org.apache.spark.sql.types.UTF8String - import com.datastax.driver.core.{Row, ProtocolVersion} import com.datastax.spark.connector.GettableData import com.datastax.spark.connector.rdd.reader.{ThisRowReaderAsFactory, RowReader} import com.datastax.spark.connector.types.TypeConverter -import org.apache.spark.sql.catalyst.expressions.{Row => SparkRow} +import org.apache.spark.sql.{Row => SparkRow} +import org.apache.spark.unsafe.types.UTF8String +import java.math.{BigDecimal => JBigDecimal} final class CassandraSQLRow(val columnNames: IndexedSeq[String], val columnValues: IndexedSeq[AnyRef]) extends GettableData with SparkRow with Serializable { @@ -35,6 +36,10 @@ final class CassandraSQLRow(val columnNames: IndexedSeq[String], val columnValue override def getShort(i: Int) = get[Short](i) override def getInt(i: Int) = get[Int](i) override def getString(i: Int) = get[String](i) + override def get(i: Int) = get[Any](i) + + override def isNullAt(i: Int): Boolean = super[GettableData].isNullAt(i) + override def toSeq: Seq[Any] = columnValues } @@ -47,7 +52,8 @@ object CassandraSQLRow { data(i) = GettableData.get(row, i) data(i) match { case date: Date => data.update(i, new Timestamp(date.getTime)) - case str: String => data.update(i, UTF8String(str)) + case bigInt: BigInteger => data.update(i, new JBigDecimal(bigInt)) + case str: String => data.update(i, UTF8String.fromString(str)) case set: Set[_] => data.update(i, set.toSeq) case _ => } diff --git a/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/DataTypeConverter.scala b/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/DataTypeConverter.scala index dc6be44c0..1e35935de 100644 --- a/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/DataTypeConverter.scala +++ b/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/DataTypeConverter.scala @@ -25,8 +25,8 @@ object DataTypeConverter extends Logging { connector.types.FloatType -> catalystTypes.FloatType, connector.types.DoubleType -> catalystTypes.DoubleType, - connector.types.VarIntType -> catalystTypes.DecimalType(), // no native arbitrary-size integer type - connector.types.DecimalType -> catalystTypes.DecimalType(), + connector.types.VarIntType -> catalystTypes.DecimalType(38, 0), // no native arbitrary-size integer type + connector.types.DecimalType -> catalystTypes.DecimalType(38, 18), connector.types.TimestampType -> catalystTypes.TimestampType, connector.types.InetType -> InetAddressType,