Skip to content

Commit

Permalink
Cassandra schema update should not fail when column exist (#55)
Browse files Browse the repository at this point in the history
Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>

Co-authored-by: Khor Shu Heng <khor.heng@gojek.com>
  • Loading branch information
khorshuheng and khorshuheng committed Apr 14, 2021
1 parent 415b612 commit 1b24bac
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.{col, lit, struct, udf}
import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation}
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SQLContext}

import scala.util.{Failure, Success, Try}

class CassandraSinkRelation(
override val sqlContext: SQLContext,
Expand Down Expand Up @@ -91,10 +93,13 @@ class CassandraSinkRelation(
|PARTITIONED BY (key)
|""".stripMargin)

sqlContext.sql(s"""
Try(sqlContext.sql(s"""
|ALTER TABLE ${fullTableReference}
|ADD COLUMNS (${columnName} BINARY, ${schemaRefColumnName} BINARY)
|""".stripMargin)
|""".stripMargin)) match {
case Success(_) | Failure(_: AnalysisException) =>
case Failure(e) => throw e
}

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package feast.ingestion

import com.datastax.spark.connector.cql.CassandraConnector
import com.dimafeng.testcontainers.{ForAllTestContainer, GenericContainer}
import feast.ingestion.helpers.DataHelper.{generateDistinctRows, rowGenerator, storeAsParquet}
import feast.ingestion.helpers.TestRow
Expand Down Expand Up @@ -76,6 +77,14 @@ class CassandraIngestionSpec extends SparkSpec with ForAllTestContainer {

val gen = rowGenerator(DateTime.parse("2020-08-01"), DateTime.parse("2020-09-01"))

def resetDatabase(): Unit = {
val connector = CassandraConnector(sparkSession.sparkContext.getConf)
connector.withSessionDo(session => session.execute("DROP KEYSPACE IF EXISTS feast"))
sparkSession.sql(
"CREATE DATABASE feast.feast WITH DBPROPERTIES (class='SimpleStrategy',replication_factor='1')"
)
}

}

def decodeAvroValue(input: Array[Byte], jsonFormatSchema: String): GenericRecord = {
Expand All @@ -89,9 +98,7 @@ class CassandraIngestionSpec extends SparkSpec with ForAllTestContainer {
}

"Dataset" should "be ingested in cassandra" in new Scope {
sparkSession.sql(
"CREATE DATABASE IF NOT EXISTS feast.feast WITH DBPROPERTIES (class='SimpleStrategy',replication_factor='1')"
)
resetDatabase()
val rows = generateDistinctRows(gen, 1000, (_: TestRow).customer).filterNot(_.customer.isEmpty)
val tempPath = storeAsParquet(sparkSession, rows)
val configWithOfflineSource = config.copy(
Expand Down Expand Up @@ -129,4 +136,19 @@ class CassandraIngestionSpec extends SparkSpec with ForAllTestContainer {
storedRows should contain allElementsOf rows

}

"Cassandra schema update" should "not fail when column exists" in new Scope {
resetDatabase()
val rows = generateDistinctRows(gen, 1, (_: TestRow).customer).filterNot(_.customer.isEmpty)
val tempPath = storeAsParquet(sparkSession, rows)
val configWithOfflineSource = config.copy(
source = FileSource(tempPath, Map.empty, "eventTimestamp")
)

for (_ <- 0 to 2) {
BatchPipeline.createPipeline(sparkSession, configWithOfflineSource)
}

}

}

0 comments on commit 1b24bac

Please sign in to comment.