Skip to content

Commit

Permalink
Configurable triggering interval from streaming ingestion (#63)
Browse files Browse the repository at this point in the history
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>

option default value

Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
  • Loading branch information
pyalex committed Apr 27, 2021
1 parent 8f6b793 commit af0ec85
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 1 deletion.
5 changes: 5 additions & 0 deletions python/feast_spark/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ class ConfigOptions(metaclass=ConfigMeta):
#: Whitelisted Feast projects
WHITELISTED_PROJECTS: Optional[str] = None

#: If set - streaming ingestion job will be consuming incoming rows not continuously,
#: but periodically with configured interval (in seconds).
#: That may help to control amount of write requests to storage
SPARK_STREAMING_TRIGGERING_INTERVAL: Optional[str] = None

def defaults(self):
return {
k: getattr(self, k)
Expand Down
4 changes: 4 additions & 0 deletions python/feast_spark/pyspark/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,7 @@ def __init__(
checkpoint_path: Optional[str] = None,
stencil_url: Optional[str] = None,
drop_invalid_rows: bool = False,
triggering_interval: Optional[int] = None,
):
super().__init__(
feature_table,
Expand All @@ -523,6 +524,7 @@ def __init__(
)
self._extra_jars = extra_jars
self._checkpoint_path = checkpoint_path
self._triggering_interval = triggering_interval

def get_name(self) -> str:
return f"{self.get_job_type().to_pascal_case()}-{self.get_feature_table_name()}"
Expand All @@ -538,6 +540,8 @@ def get_arguments(self) -> List[str]:
args.extend(["--mode", "online"])
if self._checkpoint_path:
args.extend(["--checkpoint-path", self._checkpoint_path])
if self._triggering_interval:
args.extend(["--triggering-interval", str(self._triggering_interval)])
return args

def get_job_hash(self) -> str:
Expand Down
3 changes: 3 additions & 0 deletions python/feast_spark/pyspark/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,9 @@ def get_stream_to_online_ingestion_params(
checkpoint_path=client.config.get(opt.CHECKPOINT_PATH),
stencil_url=client.config.get(opt.STENCIL_URL),
drop_invalid_rows=client.config.get(opt.INGESTION_DROP_INVALID_ROWS),
triggering_interval=client.config.getint(
opt.SPARK_STREAMING_TRIGGERING_INTERVAL, default=None
),
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ object IngestionJob {

opt[String](name = "checkpoint-path")
.action((x, c) => c.copy(checkpointPath = Some(x)))

opt[Int](name = "triggering-interval")
.action((x, c) => c.copy(streamingTriggeringSecs = x))
}

def main(args: Array[String]): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package feast.ingestion.stores.bigtable

import java.io.IOException

import com.google.cloud.bigtable.hbase.BigtableConfiguration
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{
Expand Down Expand Up @@ -75,7 +77,13 @@ class BigTableSinkRelation(
try {
admin.createTable(table)
} catch {
case _: TableExistsException => admin.modifyTable(table)
case _: TableExistsException =>
try {
admin.modifyTable(table)
} catch {
case e: IOException =>
println(s"Table modification failed: ${e.getMessage}")
}
}
} finally {
btConn.close()
Expand Down

0 comments on commit af0ec85

Please sign in to comment.