Skip to content

Commit

Permalink
Cassandra as sink option for ingestion job (#51)
Browse files Browse the repository at this point in the history
* Cassandra as alternative online storage

Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>

* Add TTL

Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>

* Add default cassandra write properties

Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>

* Use pre-generated schema

Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>

* Use multiple columns for schema ref

Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>

* Move cassandra keyspace config to spark conf

Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>

* Expose Cassandra store to python SDK

Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>

Co-authored-by: Khor Shu Heng <khor.heng@gojek.com>
  • Loading branch information
khorshuheng and khorshuheng committed Apr 7, 2021
1 parent 52c7393 commit fc32687
Show file tree
Hide file tree
Showing 17 changed files with 410 additions and 8 deletions.
6 changes: 6 additions & 0 deletions python/feast_spark/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ class ConfigOptions(metaclass=ConfigMeta):
#: BigTable Instance ID
BIGTABLE_INSTANCE: Optional[str] = ""

#: Cassandra host. Can be a comma separated string
CASSANDRA_HOST: Optional[str] = ""

#: Cassandra port
CASSANDRA_PORT: Optional[str] = ""

#: Enable or disable StatsD
STATSD_ENABLED: str = "False"

Expand Down
18 changes: 18 additions & 0 deletions python/feast_spark/pyspark/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,8 @@ def __init__(
redis_ssl: Optional[bool] = None,
bigtable_project: Optional[str] = None,
bigtable_instance: Optional[str] = None,
cassandra_host: Optional[str] = None,
cassandra_port: Optional[int] = None,
statsd_host: Optional[str] = None,
statsd_port: Optional[int] = None,
deadletter_path: Optional[str] = None,
Expand All @@ -347,6 +349,8 @@ def __init__(
self._redis_ssl = redis_ssl
self._bigtable_project = bigtable_project
self._bigtable_instance = bigtable_instance
self._cassandra_host = cassandra_host
self._cassandra_port = cassandra_port
self._statsd_host = statsd_host
self._statsd_port = statsd_port
self._deadletter_path = deadletter_path
Expand All @@ -361,6 +365,9 @@ def _get_bigtable_config(self):
project_id=self._bigtable_project, instance_id=self._bigtable_instance
)

def _get_cassandra_config(self):
return dict(host=self._cassandra_host, port=self._cassandra_port)

def _get_statsd_config(self):
return (
dict(host=self._statsd_host, port=self._statsd_port)
Expand Down Expand Up @@ -394,6 +401,9 @@ def get_arguments(self) -> List[str]:
if self._bigtable_project and self._bigtable_instance:
args.extend(["--bigtable", json.dumps(self._get_bigtable_config())])

if self._cassandra_host and self._cassandra_port:
args.extend(["--cassandra", json.dumps(self._get_cassandra_config())])

if self._get_statsd_config():
args.extend(["--statsd", json.dumps(self._get_statsd_config())])

Expand Down Expand Up @@ -427,6 +437,8 @@ def __init__(
redis_ssl: Optional[bool],
bigtable_project: Optional[str],
bigtable_instance: Optional[str],
cassandra_host: Optional[str] = None,
cassandra_port: Optional[int] = None,
statsd_host: Optional[str] = None,
statsd_port: Optional[int] = None,
deadletter_path: Optional[str] = None,
Expand All @@ -441,6 +453,8 @@ def __init__(
redis_ssl,
bigtable_project,
bigtable_instance,
cassandra_host,
cassandra_port,
statsd_host,
statsd_port,
deadletter_path,
Expand Down Expand Up @@ -481,6 +495,8 @@ def __init__(
redis_ssl: Optional[bool],
bigtable_project: Optional[str],
bigtable_instance: Optional[str],
cassandra_host: Optional[str] = None,
cassandra_port: Optional[int] = None,
statsd_host: Optional[str] = None,
statsd_port: Optional[int] = None,
deadletter_path: Optional[str] = None,
Expand All @@ -497,6 +513,8 @@ def __init__(
redis_ssl,
bigtable_project,
bigtable_instance,
cassandra_host,
cassandra_port,
statsd_host,
statsd_port,
deadletter_path,
Expand Down
3 changes: 3 additions & 0 deletions python/feast_spark/pyspark/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,9 @@ def start_offline_to_online_ingestion(
redis_ssl=client.config.getboolean(opt.REDIS_SSL),
bigtable_project=client.config.get(opt.BIGTABLE_PROJECT),
bigtable_instance=client.config.get(opt.BIGTABLE_INSTANCE),
cassandra_host=client.config.get(opt.CASSANDRA_HOST),
cassandra_port=bool(client.config.get(opt.CASSANDRA_HOST))
and client.config.getint(opt.CASSANDRA_PORT),
statsd_host=(
client.config.getboolean(opt.STATSD_ENABLED)
and client.config.get(opt.STATSD_HOST)
Expand Down
6 changes: 6 additions & 0 deletions spark/ingestion/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,12 @@
<version>2.5.0</version>
</dependency>

<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_${scala.version}</artifactId>
<version>3.0.0</version>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
Expand Down
12 changes: 12 additions & 0 deletions spark/ingestion/src/main/scala/feast/ingestion/BasePipeline.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,18 @@ object BasePipeline {
conf
.set("spark.bigtable.projectId", projectId)
.set("spark.bigtable.instanceId", instanceId)
case CassandraConfig(connection, keyspace, properties) =>
conf
.set("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions")
.set("spark.cassandra.connection.host", connection.host)
.set("spark.cassandra.connection.port", connection.port.toString)
.set("spark.cassandra.output.batch.size.bytes", properties.batchSize.toString)
.set("spark.cassandra.output.concurrent.writes", properties.concurrentWrite.toString)
.set(
s"spark.sql.catalog.feast",
"com.datastax.spark.connector.datasource.CassandraCatalog"
)
.set("feast.store.cassandra.keyspace", keyspace)
}

jobConfig.metrics match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,9 @@ object BatchPipeline extends BasePipeline {

validRows.write
.format(config.store match {
case _: RedisConfig => "feast.ingestion.stores.redis"
case _: BigTableConfig => "feast.ingestion.stores.bigtable"
case _: RedisConfig => "feast.ingestion.stores.redis"
case _: BigTableConfig => "feast.ingestion.stores.bigtable"
case _: CassandraConfig => "feast.ingestion.stores.cassandra"
})
.option("entity_columns", featureTable.entities.map(_.name).mkString(","))
.option("namespace", featureTable.name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ object IngestionJob {
opt[String](name = "bigtable")
.action((x, c) => c.copy(store = parseJSON(x).camelizeKeys.extract[BigTableConfig]))

opt[String](name = "cassandra")
.action((x, c) => c.copy(store = parseJSON(x).extract[CassandraConfig]))

opt[String](name = "statsd")
.action((x, c) => c.copy(metrics = Some(parseJSON(x).extract[StatsDConfig])))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ abstract class StoreConfig

case class RedisConfig(host: String, port: Int, ssl: Boolean) extends StoreConfig
case class BigTableConfig(projectId: String, instanceId: String) extends StoreConfig
case class CassandraConfig(
connection: CassandraConnection,
keyspace: String,
properties: CassandraWriteProperties = CassandraWriteProperties(1024, 5)
) extends StoreConfig
case class CassandraConnection(host: String, port: Int)
case class CassandraWriteProperties(batchSize: Int, concurrentWrite: Int)

sealed trait MetricConfig

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,9 @@ object StreamingPipeline extends BasePipeline with Serializable {
.filter(if (config.doNotIngestInvalidRows) expr("_isValid") else rowValidator.allChecks)
.write
.format(config.store match {
case _: RedisConfig => "feast.ingestion.stores.redis"
case _: BigTableConfig => "feast.ingestion.stores.bigtable"
case _: RedisConfig => "feast.ingestion.stores.redis"
case _: BigTableConfig => "feast.ingestion.stores.bigtable"
case _: CassandraConfig => "feast.ingestion.stores.cassandra"
})
.option("entity_columns", featureTable.entities.map(_.name).mkString(","))
.option("namespace", featureTable.name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.functions.{col, length, struct, udf}
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation}
import org.apache.spark.sql.types.{StringType, StructType}
import feast.ingestion.stores.bigtable.serialization.Serializer
import feast.ingestion.stores.serialization.Serializer
import org.apache.hadoop.security.UserGroupInformation

class BigTableSinkRelation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package feast.ingestion.stores.bigtable
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider}
import com.google.cloud.bigtable.hbase.BigtableConfiguration
import feast.ingestion.stores.bigtable.serialization.AvroSerializer
import feast.ingestion.stores.serialization.AvroSerializer

class DefaultSource extends CreatableRelationProvider {
override def createRelation(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2021 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.ingestion.stores.cassandra

import feast.ingestion.stores.serialization.Serializer
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.{col, lit, struct, udf}
import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation}
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.{DataFrame, Row, SQLContext}

class CassandraSinkRelation(
override val sqlContext: SQLContext,
val serializer: Serializer,
val config: SparkCassandraConfig
) extends BaseRelation
with InsertableRelation
with Serializable {
override def schema: StructType = ???

override def insert(data: DataFrame, overwrite: Boolean): Unit = {

val featureFields = data.schema.fields
.filterNot(f => isSystemColumn(f.name))

val featureColumns = featureFields.map(f => data(f.name))

val entityColumns = config.entityColumns.map(c => data(c).cast(StringType))
val schema = serializer.convertSchema(StructType(featureFields))
val schemaReference = serializer.schemaReference(schema)

val writerWithoutTTL = data
.select(
joinEntityKey(struct(entityColumns: _*)).alias("key"),
serializer.serializeData(schema)(struct(featureColumns: _*)).alias(columnName),
col(config.timestampColumn).alias("ts")
)
.withColumn(schemaRefColumnName, lit(schemaReference))
.writeTo(fullTableReference)
.option("writeTime", "ts")

val writer =
if (config.maxAge <= 0)
writerWithoutTTL
else writerWithoutTTL.option("ttl", config.maxAge.toString)

writer.append()
}

def sanitizedForCassandra(expr: String): String = {
expr.replace('-', '_')
}

val tableName = {
val entities = config.entityColumns.mkString("__")
sanitizedForCassandra(s"${config.projectName}__${entities}")
}

val keyspace = sqlContext.sparkContext.getConf.get("feast.store.cassandra.keyspace")

val sparkCatalog = "feast"

val fullTableReference = s"${sparkCatalog}.${keyspace}.`${tableName}`"

val columnName = sanitizedForCassandra(config.namespace)

val schemaRefColumnName = sanitizedForCassandra(s"${config.namespace}__schema_ref")

val schemaTableName = s"${sparkCatalog}.${keyspace}.feast_schema_reference"

def createTable(): Unit = {

sqlContext.sql(s"""
|CREATE TABLE IF NOT EXISTS ${fullTableReference}
|(key BINARY)
|USING cassandra
|PARTITIONED BY (key)
|""".stripMargin)

sqlContext.sql(s"""
|ALTER TABLE ${fullTableReference}
|ADD COLUMNS (${columnName} BINARY, ${schemaRefColumnName} BINARY)
|""".stripMargin)

}

private def joinEntityKey: UserDefinedFunction = udf { r: Row =>
((0 until r.size)).map(r.getString).mkString("#").getBytes
}

private def isSystemColumn(name: String) =
(config.entityColumns ++ Seq(config.timestampColumn)).contains(name)

def saveWriteSchema(data: DataFrame) = {
sqlContext.sql(s"""
|CREATE TABLE IF NOT EXISTS ${schemaTableName}
|(schema_ref BINARY, avro_schema BINARY)
|USING cassandra
|PARTITIONED BY (schema_ref)
|""".stripMargin)

val featureFields = data.schema.fields
.filterNot(f => isSystemColumn(f.name))
val featureSchema = StructType(featureFields)

val schema = serializer.convertSchema(featureSchema)
val key = serializer.schemaReference(schema)

import sqlContext.sparkSession.implicits._
val schemaData =
List((key, schema.asInstanceOf[String].getBytes)).toDF("schema_ref", "avro_schema")

schemaData.writeTo(schemaTableName).append()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2021 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.ingestion.stores.cassandra

import feast.ingestion.stores.serialization.AvroSerializer
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider}
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}

class DefaultSource extends CreatableRelationProvider {
override def createRelation(
sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
data: DataFrame
): BaseRelation = {

val rel =
new CassandraSinkRelation(
sqlContext,
new AvroSerializer,
SparkCassandraConfig.parse(parameters)
)
rel.createTable()
rel.saveWriteSchema(data)
rel.insert(data, overwrite = false)
rel
}
}

0 comments on commit fc32687

Please sign in to comment.