Skip to content

Commit

Permalink
Call admin.modifyTable only if table spec changes: preventing quota t…
Browse files Browse the repository at this point in the history
…o be exceeded (#66)

* use bigtable admin api only if smth changed

Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>

* Spec for updating ttl on existing column family

Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
  • Loading branch information
pyalex committed Apr 28, 2021
1 parent 8e5c0e4 commit bb0fd44
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,26 +64,28 @@ class BigTableSinkRelation(
admin.getTableDescriptor(TableName.valueOf(tableName))
}

if (!table.getColumnFamilyNames.contains(config.namespace.getBytes)) {
val featuresCF = new HColumnDescriptor(config.namespace)
if (config.maxAge > 0) {
featuresCF.setTimeToLive(config.maxAge.toInt)
}
val featuresCF = new HColumnDescriptor(config.namespace)
if (config.maxAge > 0) {
featuresCF.setTimeToLive(config.maxAge.toInt)
}

featuresCF.setMaxVersions(1)

featuresCF.setMaxVersions(1)
if (!table.getColumnFamilyNames.contains(config.namespace.getBytes)) {
table.addFamily(featuresCF)
}

try {
admin.createTable(table)
} catch {
case _: TableExistsException =>
try {
admin.modifyTable(table)
} catch {
case e: IOException =>
println(s"Table modification failed: ${e.getMessage}")
}
if (!admin.isTableAvailable(table.getTableName)) {
admin.createTable(table)
} else {
admin.modifyTable(table)
}
} else if (
config.maxAge > 0 && table
.getColumnFamily(config.namespace.getBytes)
.getTimeToLive != featuresCF.getTimeToLive
) {
table.modifyFamily(featuresCF)
admin.modifyTable(table)
}
} finally {
btConn.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import feast.proto.types.ValueProto.ValueType
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
import org.apache.avro.io.DecoderFactory
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.{HColumnDescriptor, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.client.{Connection, Scan}
import org.apache.spark.SparkConf
import org.joda.time.DateTime
Expand Down Expand Up @@ -143,4 +143,38 @@ class BigTableIngestionSpec extends SparkSpec with ForAllTestContainer {
cf.getMinVersions should be(0)
}
}

"Column family" should "be updated when ttl changed" in withBTConnection { btConn =>
new Scope {
val rows = generateDistinctRows(gen, 1, (_: TestRow).customer)
val tempPath = storeAsParquet(sparkSession, rows)
val configWithOfflineSource = config.copy(
source = FileSource(tempPath, Map.empty, "eventTimestamp"),
featureTable = config.featureTable.copy(
name = "feature-table-name",
maxAge = Some(600L)
)
)

val tableName = TableName.valueOf("default__customer")
val cfName = configWithOfflineSource.featureTable.name.getBytes
val tableV1 = new HTableDescriptor(tableName)
val featuresCF = new HColumnDescriptor(cfName)
val metadataCF = new HColumnDescriptor("metadata")
featuresCF.setTimeToLive(300)
tableV1.addFamily(featuresCF)
tableV1.addFamily(metadataCF)

if (btConn.getAdmin.isTableAvailable(tableName)) {
btConn.getAdmin.deleteTable(tableName)
}
btConn.getAdmin.createTable(tableV1)

BatchPipeline.createPipeline(sparkSession, configWithOfflineSource)

val tableV2 = btConn.getAdmin.getDescriptor(tableName)
val cf = tableV2.getColumnFamily(cfName)
cf.getTimeToLive should be(600)
}
}
}

0 comments on commit bb0fd44

Please sign in to comment.