Skip to content

Commit

Permalink
SPARKC-241: Initial integration with Spark 1.5.0
Browse files Browse the repository at this point in the history
  • Loading branch information
jacek-lewandowski committed Sep 10, 2015
1 parent b3319fb commit d12f8e0
Show file tree
Hide file tree
Showing 12 changed files with 83 additions and 65 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
@@ -1,5 +1,6 @@
1.5.0 M1
* Added ability to build against unreleased Spark versions (SPARKC-242)
* Spark 1.5 initial integration (SPARKC-241)

********************************************************************************

Expand Down
3 changes: 2 additions & 1 deletion project/CassandraSparkBuild.scala
Expand Up @@ -145,6 +145,7 @@ object Dependencies {

/* To allow spark artifact inclusion in the demos at runtime, we set 'provided' below. */
val sparkCore = "org.apache.spark" %% "spark-core" % Spark guavaExclude // ApacheV2
val sparkUnsafe = "org.apache.spark" %% "spark-unsafe" % Spark guavaExclude // ApacheV2
val sparkStreaming = "org.apache.spark" %% "spark-streaming" % Spark guavaExclude // ApacheV2
val sparkSql = "org.apache.spark" %% "spark-sql" % Spark sparkExclusions // ApacheV2
val sparkCatalyst = "org.apache.spark" %% "spark-catalyst" % Spark sparkExclusions // ApacheV2
Expand Down Expand Up @@ -215,7 +216,7 @@ object Dependencies {

val cassandra = Seq(cassandraClient, cassandraDriver)

val spark = Seq(sparkCore, sparkStreaming, sparkSql, sparkCatalyst, sparkHive)
val spark = Seq(sparkCore, sparkStreaming, sparkSql, sparkCatalyst, sparkHive, sparkUnsafe)

val connector = testKit ++ metrics ++ jetty ++ logging ++ akka ++ cassandra ++ spark.map(_ % "provided") ++ Seq(
commonsLang3, config, guava, jodaC, jodaT, lzf, jsr166e)
Expand Down
2 changes: 1 addition & 1 deletion project/Versions.scala
Expand Up @@ -56,7 +56,7 @@ object Versions {
// and install in a local Maven repository. This is all done automatically, however it will work
// only on Unix/OSX operating system. Windows users have to build and install Spark manually if the
// desired version is not yet published into a public Maven repository.
val Spark = "1.4.0"
val Spark = "1.5.0"
val SparkJetty = "8.1.14.v20131031"
val JSR166e = "1.1.0"

Expand Down
@@ -1,19 +1,14 @@
package com.datastax.spark.connector.demo

import com.datastax.spark.connector.embedded.Event.WordCount

import scala.sys.process._
import scala.util.Try
import kafka.serializer.StringDecoder
import org.apache.spark.{Logging, SparkContext, SparkConf}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.{Logging, SparkConf, SparkContext}

import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.spark.connector.embedded.Event.WordCount
import com.datastax.spark.connector.embedded._
import com.datastax.spark.connector._
import com.datastax.spark.connector.streaming._

/**
Expand Down
Expand Up @@ -4,7 +4,8 @@ import java.io.{PrintWriter, StringWriter, StringReader, BufferedReader}
import java.net.URLClassLoader

import scala.collection.mutable.ArrayBuffer
import scala.tools.nsc.interpreter.SparkILoop

import org.apache.spark.repl.SparkILoop

trait SparkRepl {

Expand Down
Expand Up @@ -4,11 +4,12 @@ import java.net.InetAddress

import org.apache.commons.configuration.ConfigurationException

import scala.collection.JavaConversions._

/** A utility trait for integration testing.
* Manages *one* single Cassandra server at a time and enables switching its configuration.
* This is not thread safe, and test suites must not be run in parallel,
* because they will "steal" the server.*/
* because they will "steal" the server. */
trait EmbeddedCassandra {

/** Implementation hook. */
Expand All @@ -19,7 +20,7 @@ trait EmbeddedCassandra {
* the state (including data) of the previously running cassandra cluster is lost.
* @param configTemplates name of the cassandra.yaml template resources
* @param forceReload if set to true, the server will be reloaded fresh
* even if the configuration didn't change */
* even if the configuration didn't change */
def useCassandraConfig(configTemplates: Seq[String], forceReload: Boolean = false) {
import EmbeddedCassandra._
import UserDefinedProperty._
Expand Down Expand Up @@ -49,32 +50,40 @@ object UserDefinedProperty {

trait TypedProperty {
type ValueType

def convertValueFromString(str: String): ValueType

def checkValueType(obj: Any): ValueType
}

trait IntProperty extends TypedProperty {
type ValueType = Int

def convertValueFromString(str: String) = str.toInt

def checkValueType(obj: Any) =
obj match {
case x: Int => x
case _ => throw new ClassCastException (s"Expected Int but found ${obj.getClass.getName}")
case _ => throw new ClassCastException(s"Expected Int but found ${obj.getClass.getName}")
}
}

trait InetAddressProperty extends TypedProperty {
type ValueType = InetAddress

def convertValueFromString(str: String) = InetAddress.getByName(str)

def checkValueType(obj: Any) =
obj match {
case x: InetAddress => x
case _ => throw new ClassCastException (s"Expected InetAddress but found ${obj.getClass.getName}")
case _ => throw new ClassCastException(s"Expected InetAddress but found ${obj.getClass.getName}")
}
}

abstract sealed class NodeProperty(val propertyName: String) extends TypedProperty

case object HostProperty extends NodeProperty("IT_TEST_CASSANDRA_HOSTS") with InetAddressProperty

case object PortProperty extends NodeProperty("IT_TEST_CASSANDRA_PORTS") with IntProperty

private def getValueSeq(propertyName: String): Seq[String] = {
Expand All @@ -92,7 +101,7 @@ object UserDefinedProperty {

def getProperty(nodeProperty: NodeProperty): Option[String] =
sys.env.get(nodeProperty.propertyName)

def getPropertyOrThrowIfNotFound(nodeProperty: NodeProperty): String =
getProperty(nodeProperty).getOrElse(
throw new ConfigurationException(s"Missing ${nodeProperty.propertyName} in system environment"))
Expand All @@ -101,10 +110,10 @@ object UserDefinedProperty {
object EmbeddedCassandra {

import UserDefinedProperty._
private def countCommaSeparatedItemsIn(s: String): Int =

private def countCommaSeparatedItemsIn(s: String): Int =
s.count(_ == ',')

getProperty(HostProperty) match {
case None =>
case Some(hostsStr) =>
Expand All @@ -124,23 +133,27 @@ object EmbeddedCassandra {
require(hosts.isEmpty || index < hosts.length, s"$index index is overflow the size of ${hosts.length}")
val host = getHost(index).getHostAddress
Map(
"seeds" -> host,
"storage_port" -> getStoragePort(index).toString,
"ssl_storage_port" -> getSslStoragePort(index).toString,
"seeds" -> host,
"storage_port" -> getStoragePort(index).toString,
"ssl_storage_port" -> getSslStoragePort(index).toString,
"native_transport_port" -> getPort(index).toString,
"jmx_port" -> getJmxPort(index).toString,
"rpc_address" -> host,
"listen_address" -> host,
"cluster_name" -> getClusterName(index),
"keystore_path" -> ClassLoader.getSystemResource("keystore").getPath)
"jmx_port" -> getJmxPort(index).toString,
"rpc_address" -> host,
"listen_address" -> host,
"cluster_name" -> getClusterName(index),
"keystore_path" -> ClassLoader.getSystemResource("keystore").getPath)
}

def getStoragePort(index: Integer) = 7000 + index

def getSslStoragePort(index: Integer) = 7100 + index

def getJmxPort(index: Integer) = CassandraRunner.DefaultJmxPort + index

def getClusterName(index: Integer) = s"Test Cluster$index"

def getHost(index: Integer): InetAddress = getNodeProperty(index, HostProperty)

def getPort(index: Integer) = getNodeProperty(index, PortProperty)

private def getNodeProperty(index: Integer, nodeProperty: NodeProperty): nodeProperty.ValueType = {
Expand All @@ -153,7 +166,7 @@ object EmbeddedCassandra {
case _ => throw new RuntimeException(s"$index index is overflow the size of ${hosts.size}")
}
}
}
}

Runtime.getRuntime.addShutdownHook(new Thread(new Runnable {
override def run() = cassandraRunners.flatten.foreach(_.destroy())
Expand All @@ -164,9 +177,10 @@ private[connector] class CassandraRunner(val configTemplate: String, props: Map[
extends Embedded {

import java.io.{File, FileOutputStream, IOException}
import org.apache.cassandra.io.util.FileUtils
import com.google.common.io.Files

import CassandraRunner._
import com.google.common.io.Files
import org.apache.cassandra.io.util.FileUtils

val tempDir = mkdir(new File(Files.createTempDir(), "cassandra-driver-spark"))
val workDir = mkdir(new File(tempDir, "cassandra"))
Expand All @@ -183,7 +197,16 @@ private[connector] class CassandraRunner(val configTemplate: String, props: Map[
}
}

private val classPath = System.getProperty("java.class.path")
private val classPath = sys.env.get("IT_CASSANDRA_PATH") match {
case Some(customCassandraDir) =>
val entries = (for (f <- Files.fileTreeTraverser().breadthFirstTraversal(new File(customCassandraDir, "lib")).toIterator
if f.isDirectory || f.getName.endsWith(".jar")) yield {
f.getAbsolutePath
}).toList ::: new File(customCassandraDir, "conf") :: Nil
entries.mkString(File.pathSeparator)
case _ => System.getProperty("java.class.path")
}

private val javaBin = System.getProperty("java.home") + "/bin/java"
private val cassandraConfProperty = "-Dcassandra.config=file:" + confFile.toString
private val superuserSetupDelayProperty = "-Dcassandra.superuser_setup_delay_ms=0"
Expand All @@ -204,7 +227,7 @@ private[connector] class CassandraRunner(val configTemplate: String, props: Map[
.inheritIO()
.start()

val nativePort = props.get("native_transport_port").get.toInt
val nativePort = props.get("native_transport_port").get.toInt
if (!waitForPortOpen(InetAddress.getByName(props.get("rpc_address").get), nativePort, 100000))
throw new IOException("Failed to start Cassandra.")

Expand Down
Expand Up @@ -357,6 +357,7 @@ object TypeConverter {
case x: java.lang.Integer => new java.math.BigInteger(x.toString)
case x: java.lang.Long => new java.math.BigInteger(x.toString)
case x: String => new java.math.BigInteger(x)
case x: java.math.BigDecimal if x.scale() <= 0 => x.toBigInteger
}
}

Expand Down
Expand Up @@ -2,7 +2,7 @@ package com.datastax.spark.connector.writer

import com.datastax.spark.connector.ColumnRef
import com.datastax.spark.connector.cql.TableDef
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.Row

/** A [[RowWriter]] that can write SparkSQL `Row` objects. */
class SqlRowWriter(val table: TableDef, val selectedColumns: IndexedSeq[ColumnRef])
Expand Down
Expand Up @@ -2,23 +2,17 @@ package org.apache.spark.sql.cassandra

import java.io.IOException

import org.apache.spark.sql.catalyst.{SimpleCatalystConf, CatalystConf}

import com.datastax.spark.connector.cql.{CassandraConnector, Schema}
import com.google.common.cache.{CacheBuilder, CacheLoader}

import scala.collection.JavaConversions._

import com.google.common.cache.{LoadingCache, CacheBuilder, CacheLoader}
import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
import org.apache.spark.Logging

import org.apache.spark.sql.cassandra.CassandraSourceRelation._
import org.apache.spark.sql.catalyst.analysis.Catalog
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Subquery}
import org.apache.spark.sql.sources.LogicalRelation

import com.datastax.spark.connector.cql.{CassandraConnectorConf, CassandraConnector, Schema}
import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf, TableIdentifier}
import org.apache.spark.sql.execution.datasources.LogicalRelation

import CassandraSourceRelation._
import com.datastax.spark.connector.cql.{CassandraConnector, CassandraConnectorConf, Schema}

private[cassandra] class CassandraCatalog(csc: CassandraSQLContext) extends Catalog with Logging {

Expand Down Expand Up @@ -136,7 +130,7 @@ private[cassandra] class CassandraCatalog(csc: CassandraSQLContext) extends Cata

override val conf: CatalystConf = SimpleCatalystConf(caseSensitive)

def refreshTable(databaseName: String, tableName: String): Unit = {
cachedDataSourceTables.refresh(Seq(csc.getCluster, databaseName, tableName))
override def refreshTable(tableIdent: TableIdentifier): Unit = {
cachedDataSourceTables.refresh(tableIdent.toSeq)
}
}
Expand Up @@ -2,15 +2,11 @@ package org.apache.spark.sql.cassandra

import java.util.NoSuchElementException

import org.apache.commons.lang.StringUtils
import org.apache.spark.sql.sources.DataSourceStrategy
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.catalyst.analysis.OverrideCatalog
import org.apache.spark.SparkContext
import org.apache.spark.sql.cassandra.CassandraSourceRelation._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.{DataFrame, Strategy, SQLContext}

import CassandraSQLContext._
import CassandraSourceRelation._
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.{DataFrame, SQLContext, Strategy}

/** Allows to execute SQL queries against Cassandra and access results as
* `SchemaRDD` collections. Predicate pushdown to Cassandra is supported.
Expand All @@ -36,7 +32,7 @@ import CassandraSourceRelation._
*
* }}} */
class CassandraSQLContext(sc: SparkContext) extends SQLContext(sc) {
import CassandraSQLContext._
import org.apache.spark.sql.cassandra.CassandraSQLContext._

override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
new this.QueryExecution(plan)
Expand Down Expand Up @@ -87,12 +83,12 @@ class CassandraSQLContext(sc: SparkContext) extends SQLContext(sc) {
override val strategies: Seq[Strategy] = Seq(
DataSourceStrategy,
DDLStrategy,
TakeOrdered,
ParquetOperations,
TakeOrderedAndProject,
InMemoryScans,
HashAggregation,
Aggregation,
LeftSemiJoin,
HashJoin,
EquiJoinSelection,
BasicOperators,
CartesianProduct,
BroadcastNestedLoopJoin
Expand Down
@@ -1,15 +1,16 @@
package org.apache.spark.sql.cassandra

import java.math.BigInteger
import java.sql.Timestamp
import java.util.Date

import org.apache.spark.sql.types.UTF8String

import com.datastax.driver.core.{Row, ProtocolVersion}
import com.datastax.spark.connector.GettableData
import com.datastax.spark.connector.rdd.reader.{ThisRowReaderAsFactory, RowReader}
import com.datastax.spark.connector.types.TypeConverter
import org.apache.spark.sql.catalyst.expressions.{Row => SparkRow}
import org.apache.spark.sql.{Row => SparkRow}
import org.apache.spark.unsafe.types.UTF8String
import java.math.{BigDecimal => JBigDecimal}

final class CassandraSQLRow(val columnNames: IndexedSeq[String], val columnValues: IndexedSeq[AnyRef])
extends GettableData with SparkRow with Serializable {
Expand All @@ -35,6 +36,10 @@ final class CassandraSQLRow(val columnNames: IndexedSeq[String], val columnValue
override def getShort(i: Int) = get[Short](i)
override def getInt(i: Int) = get[Int](i)
override def getString(i: Int) = get[String](i)
override def get(i: Int) = get[Any](i)

override def isNullAt(i: Int): Boolean = super[GettableData].isNullAt(i)

override def toSeq: Seq[Any] = columnValues
}

Expand All @@ -47,7 +52,8 @@ object CassandraSQLRow {
data(i) = GettableData.get(row, i)
data(i) match {
case date: Date => data.update(i, new Timestamp(date.getTime))
case str: String => data.update(i, UTF8String(str))
case bigInt: BigInteger => data.update(i, new JBigDecimal(bigInt))
case str: String => data.update(i, UTF8String.fromString(str))
case set: Set[_] => data.update(i, set.toSeq)
case _ =>
}
Expand Down
Expand Up @@ -25,8 +25,8 @@ object DataTypeConverter extends Logging {
connector.types.FloatType -> catalystTypes.FloatType,
connector.types.DoubleType -> catalystTypes.DoubleType,

connector.types.VarIntType -> catalystTypes.DecimalType(), // no native arbitrary-size integer type
connector.types.DecimalType -> catalystTypes.DecimalType(),
connector.types.VarIntType -> catalystTypes.DecimalType(38, 0), // no native arbitrary-size integer type
connector.types.DecimalType -> catalystTypes.DecimalType(38, 18),

connector.types.TimestampType -> catalystTypes.TimestampType,
connector.types.InetType -> InetAddressType,
Expand Down

0 comments on commit d12f8e0

Please sign in to comment.