Skip to content

Commit

Permalink
Bigtable as alternative Online Storage (#46)
Browse files Browse the repository at this point in the history
* BigTableRelation

Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>

* bigtable configuration in jobservice

Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
  • Loading branch information
pyalex committed Mar 31, 2021
1 parent 2b6d19e commit c768f12
Show file tree
Hide file tree
Showing 21 changed files with 612 additions and 51 deletions.
1 change: 1 addition & 0 deletions infra/scripts/test-end-to-end-gcp.sh
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ helm_install "js" "${DOCKER_REPOSITORY}" "${GIT_TAG}" "$NAMESPACE" \
--set "feast-jobservice.envOverrides.FEAST_DATAPROC_REGION=us-central1" \
--set "feast-jobservice.envOverrides.FEAST_SPARK_STAGING_LOCATION=gs://feast-templocation-kf-feast/" \
--set "feast-jobservice.envOverrides.FEAST_REDIS_HOST=10.128.0.105" \
--set "feast-jobservice.envOverrides.FEAST_REDIS_PORT=6379" \
--set 'feast-online-serving.application-override\.yaml.feast.stores[0].type=REDIS_CLUSTER' \
--set 'feast-online-serving.application-override\.yaml.feast.stores[0].name=online' \
--set 'feast-online-serving.application-override\.yaml.feast.stores[0].config.connection_string=10.128.0.105:6379' \
Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
<maven-assembly-plugin.version>3.3.0</maven-assembly-plugin.version>
<protobuf.version>3.12.2</protobuf.version>
<commons.lang3.version>3.10</commons.lang3.version>
<hbase.version>2.1.10</hbase.version>

<license.content><![CDATA[
/*
Expand Down
12 changes: 9 additions & 3 deletions python/feast_spark/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,19 @@ class ConfigOptions(metaclass=ConfigMeta):
HISTORICAL_FEATURE_OUTPUT_LOCATION: Optional[str] = None

#: Default Redis host
REDIS_HOST: str = "localhost"
REDIS_HOST: Optional[str] = ""

#: Default Redis port
REDIS_PORT: str = "6379"
REDIS_PORT: Optional[str] = ""

#: Enable or disable TLS/SSL to Redis
REDIS_SSL: str = "False"
REDIS_SSL: Optional[str] = "False"

#: BigTable Project ID
BIGTABLE_PROJECT: Optional[str] = ""

#: BigTable Instance ID
BIGTABLE_INSTANCE: Optional[str] = ""

#: Enable or disable StatsD
STATSD_ENABLED: str = "False"
Expand Down
43 changes: 32 additions & 11 deletions python/feast_spark/pyspark/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,9 +328,11 @@ def __init__(
feature_table: Dict,
source: Dict,
jar: str,
redis_host: str,
redis_port: int,
redis_ssl: bool,
redis_host: Optional[str] = None,
redis_port: Optional[int] = None,
redis_ssl: Optional[bool] = None,
bigtable_project: Optional[str] = None,
bigtable_instance: Optional[str] = None,
statsd_host: Optional[str] = None,
statsd_port: Optional[int] = None,
deadletter_path: Optional[str] = None,
Expand All @@ -343,6 +345,8 @@ def __init__(
self._redis_host = redis_host
self._redis_port = redis_port
self._redis_ssl = redis_ssl
self._bigtable_project = bigtable_project
self._bigtable_instance = bigtable_instance
self._statsd_host = statsd_host
self._statsd_port = statsd_port
self._deadletter_path = deadletter_path
Expand All @@ -352,6 +356,11 @@ def __init__(
def _get_redis_config(self):
return dict(host=self._redis_host, port=self._redis_port, ssl=self._redis_ssl)

def _get_bigtable_config(self):
return dict(
project_id=self._bigtable_project, instance_id=self._bigtable_instance
)

def _get_statsd_config(self):
return (
dict(host=self._statsd_host, port=self._statsd_port)
Expand All @@ -377,10 +386,14 @@ def get_arguments(self) -> List[str]:
json.dumps(self._feature_table),
"--source",
json.dumps(self._source),
"--redis",
json.dumps(self._get_redis_config()),
]

if self._redis_host and self._redis_port:
args.extend(["--redis", json.dumps(self._get_redis_config())])

if self._bigtable_project and self._bigtable_instance:
args.extend(["--bigtable", json.dumps(self._get_bigtable_config())])

if self._get_statsd_config():
args.extend(["--statsd", json.dumps(self._get_statsd_config())])

Expand Down Expand Up @@ -409,9 +422,11 @@ def __init__(
start: datetime,
end: datetime,
jar: str,
redis_host: str,
redis_port: int,
redis_ssl: bool,
redis_host: Optional[str],
redis_port: Optional[int],
redis_ssl: Optional[bool],
bigtable_project: Optional[str],
bigtable_instance: Optional[str],
statsd_host: Optional[str] = None,
statsd_port: Optional[int] = None,
deadletter_path: Optional[str] = None,
Expand All @@ -424,6 +439,8 @@ def __init__(
redis_host,
redis_port,
redis_ssl,
bigtable_project,
bigtable_instance,
statsd_host,
statsd_port,
deadletter_path,
Expand Down Expand Up @@ -459,9 +476,11 @@ def __init__(
source: Dict,
jar: str,
extra_jars: List[str],
redis_host: str,
redis_port: int,
redis_ssl: bool,
redis_host: Optional[str],
redis_port: Optional[int],
redis_ssl: Optional[bool],
bigtable_project: Optional[str],
bigtable_instance: Optional[str],
statsd_host: Optional[str] = None,
statsd_port: Optional[int] = None,
deadletter_path: Optional[str] = None,
Expand All @@ -476,6 +495,8 @@ def __init__(
redis_host,
redis_port,
redis_ssl,
bigtable_project,
bigtable_instance,
statsd_host,
statsd_port,
deadletter_path,
Expand Down
10 changes: 8 additions & 2 deletions python/feast_spark/pyspark/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,11 @@ def start_offline_to_online_ingestion(
start=start,
end=end,
redis_host=client.config.get(opt.REDIS_HOST),
redis_port=client.config.getint(opt.REDIS_PORT),
redis_port=bool(client.config.get(opt.REDIS_HOST))
and client.config.getint(opt.REDIS_PORT),
redis_ssl=client.config.getboolean(opt.REDIS_SSL),
bigtable_project=client.config.get(opt.BIGTABLE_PROJECT),
bigtable_instance=client.config.get(opt.BIGTABLE_INSTANCE),
statsd_host=(
client.config.getboolean(opt.STATSD_ENABLED)
and client.config.get(opt.STATSD_HOST)
Expand All @@ -296,8 +299,11 @@ def get_stream_to_online_ingestion_params(
source=_source_to_argument(feature_table.stream_source, client.config),
feature_table=_feature_table_to_argument(client, project, feature_table),
redis_host=client.config.get(opt.REDIS_HOST),
redis_port=client.config.getint(opt.REDIS_PORT),
redis_port=bool(client.config.get(opt.REDIS_HOST))
and client.config.getint(opt.REDIS_PORT),
redis_ssl=client.config.getboolean(opt.REDIS_SSL),
bigtable_project=client.config.get(opt.BIGTABLE_PROJECT),
bigtable_instance=client.config.get(opt.BIGTABLE_INSTANCE),
statsd_host=client.config.getboolean(opt.STATSD_ENABLED)
and client.config.get(opt.STATSD_HOST),
statsd_port=client.config.getboolean(opt.STATSD_ENABLED)
Expand Down
1 change: 1 addition & 0 deletions python/feast_spark/pyspark/launchers/k8s/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ def _upload_jar(self, jar_path: str) -> str:
jar_path.startswith("s3://")
or jar_path.startswith("s3a://")
or jar_path.startswith("https://")
or jar_path.startswith("local://")
):
return jar_path
elif jar_path.startswith("file://"):
Expand Down
30 changes: 30 additions & 0 deletions spark/ingestion/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,36 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.google.cloud.bigtable</groupId>
<artifactId>bigtable-hbase-2.x-hadoop</artifactId>
<version>1.19.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
<version>${hbase.version}</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.10.0</version>
</dependency>

<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ object BasePipeline {
.set("spark.redis.host", host)
.set("spark.redis.port", port.toString)
.set("spark.redis.ssl", ssl.toString)
case BigTableConfig(projectId, instanceId) =>
conf
.set("spark.bigtable.projectId", projectId)
.set("spark.bigtable.instanceId", instanceId)
}

jobConfig.metrics match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,10 @@ object BatchPipeline extends BasePipeline {
.filter(rowValidator.allChecks)

validRows.write
.format("feast.ingestion.stores.redis")
.format(config.store match {
case _: RedisConfig => "feast.ingestion.stores.redis"
case _: BigTableConfig => "feast.ingestion.stores.bigtable"
})
.option("entity_columns", featureTable.entities.map(_.name).mkString(","))
.option("namespace", featureTable.name)
.option("project_name", featureTable.project)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ object IngestionJob {
opt[String](name = "redis")
.action((x, c) => c.copy(store = parseJSON(x).extract[RedisConfig]))

opt[String](name = "bigtable")
.action((x, c) => c.copy(store = parseJSON(x).camelizeKeys.extract[BigTableConfig]))

opt[String](name = "statsd")
.action((x, c) => c.copy(metrics = Some(parseJSON(x).extract[StatsDConfig])))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ object Modes extends Enumeration {

abstract class StoreConfig

case class RedisConfig(host: String, port: Int, ssl: Boolean) extends StoreConfig
case class RedisConfig(host: String, port: Int, ssl: Boolean) extends StoreConfig
case class BigTableConfig(projectId: String, instanceId: String) extends StoreConfig

sealed trait MetricConfig

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
*/
package feast.ingestion

import java.io.File
import java.util.concurrent.TimeUnit

import feast.ingestion.metrics.IngestionPipelineMetrics
import feast.ingestion.registry.proto.ProtoRegistryFactory
import feast.ingestion.utils.ProtoReflection
Expand All @@ -34,9 +37,6 @@ import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.types.BooleanType
import org.apache.spark.{SparkEnv, SparkFiles}

import java.io.File
import java.util.concurrent.TimeUnit

/**
* Streaming pipeline (currently in micro-batches mode only, since we need to have multiple sinks: redis & deadletters).
* Flow:
Expand Down Expand Up @@ -111,7 +111,10 @@ object StreamingPipeline extends BasePipeline with Serializable {
.map(metrics.incrementRead)
.filter(if (config.doNotIngestInvalidRows) expr("_isValid") else rowValidator.allChecks)
.write
.format("feast.ingestion.stores.redis")
.format(config.store match {
case _: RedisConfig => "feast.ingestion.stores.redis"
case _: BigTableConfig => "feast.ingestion.stores.bigtable"
})
.option("entity_columns", featureTable.entities.map(_.name).mkString(","))
.option("namespace", featureTable.name)
.option("project_name", featureTable.project)
Expand Down

0 comments on commit c768f12

Please sign in to comment.