Skip to content

Commit

Permalink
Port telemetry to FeatureStore API (#1446)
Browse files Browse the repository at this point in the history
* Port telemetry to FeatureStore API

Signed-off-by: Jacob Klegar <jacob@tecton.ai>

* formatting

Signed-off-by: Jacob Klegar <jacob@tecton.ai>

* lint

Signed-off-by: Jacob Klegar <jacob@tecton.ai>

* Add telemetry to all FeatureStore methods

Signed-off-by: Jacob Klegar <jacob@tecton.ai>

* Move telemetry into module

Signed-off-by: Willem Pienaar <git@willem.co>

* Remove version code

Signed-off-by: Willem Pienaar <git@willem.co>

* Fix broken telemetry tests

Signed-off-by: Willem Pienaar <git@willem.co>

* Remove debug code

Signed-off-by: Willem Pienaar <git@willem.co>

Co-authored-by: Willem Pienaar <git@willem.co>
  • Loading branch information
jklegar and woop committed Apr 10, 2021
1 parent 41a7a4a commit f371c2d
Show file tree
Hide file tree
Showing 6 changed files with 219 additions and 113 deletions.
71 changes: 7 additions & 64 deletions sdk/python/feast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,8 @@
# limitations under the License.
import logging
import multiprocessing
import os
import shutil
import uuid
import warnings
from datetime import datetime
from os.path import expanduser, join
from typing import Any, Dict, List, Optional, Union

import grpc
Expand Down Expand Up @@ -73,7 +69,7 @@
)
from feast.protos.feast.serving.ServingService_pb2_grpc import ServingServiceStub
from feast.registry import Registry
from feast.telemetry import log_usage
from feast.telemetry import Telemetry

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -121,7 +117,7 @@ def __init__(self, options: Optional[Dict[str, str]] = None, **kwargs):
if self._config.getboolean(opt.ENABLE_AUTH):
self._auth_metadata = feast_auth.get_auth_metadata_plugin(self._config)

self._configure_telemetry()
self._tele = Telemetry()

@property
def config(self) -> Config:
Expand Down Expand Up @@ -351,27 +347,6 @@ def version(self, sdk_only=False):

return result

def _configure_telemetry(self):
telemetry_filepath = join(expanduser("~"), ".feast", "telemetry")
self._telemetry_enabled = (
self._config.get(opt.TELEMETRY, "True") == "True"
) # written this way to turn the env var string into a boolean
if self._telemetry_enabled:
self._telemetry_counter = {"get_online_features": 0}
if os.path.exists(telemetry_filepath):
with open(telemetry_filepath, "r") as f:
self._telemetry_id = f.read()
else:
self._telemetry_id = str(uuid.uuid4())
print(
"Feast is an open source project that collects anonymized usage statistics. To opt out or learn more see https://docs.feast.dev/v/master/advanced/telemetry"
)
with open(telemetry_filepath, "w") as f:
f.write(self._telemetry_id)
else:
if os.path.exists(telemetry_filepath):
os.remove(telemetry_filepath)

@property
def project(self) -> str:
"""
Expand Down Expand Up @@ -492,13 +467,7 @@ def apply(
>>> feast_client.apply(entity)
"""

if self._telemetry_enabled:
log_usage(
"apply",
self._telemetry_id,
datetime.utcnow(),
self.version(sdk_only=True),
)
self._tele.log("apply")
if project is None:
project = self.project

Expand Down Expand Up @@ -612,13 +581,7 @@ def get_entity(self, name: str, project: str = None) -> Entity:
none is found
"""

if self._telemetry_enabled:
log_usage(
"get_entity",
self._telemetry_id,
datetime.utcnow(),
self.version(sdk_only=True),
)
self._tele.log("get_entity")

if project is None:
project = self.project
Expand Down Expand Up @@ -743,13 +706,7 @@ def get_feature_table(self, name: str, project: str = None) -> FeatureTable:
none is found
"""

if self._telemetry_enabled:
log_usage(
"get_feature_table",
self._telemetry_id,
datetime.utcnow(),
self.version(sdk_only=True),
)
self._tele.log("get_feature_table")

if project is None:
project = self.project
Expand Down Expand Up @@ -890,13 +847,7 @@ def ingest(
>>> client.ingest(driver_ft, ft_df)
"""

if self._telemetry_enabled:
log_usage(
"ingest",
self._telemetry_id,
datetime.utcnow(),
self.version(sdk_only=True),
)
self._tele.log("ingest")
if project is None:
project = self.project
if isinstance(feature_table, str):
Expand Down Expand Up @@ -1021,15 +972,7 @@ def get_online_features(
{'sales:daily_transactions': [1.1,1.2], 'sales:customer_id': [0,1]}
"""

if self._telemetry_enabled:
if self._telemetry_counter["get_online_features"] % 1000 == 0:
log_usage(
"get_online_features",
self._telemetry_id,
datetime.utcnow(),
self.version(sdk_only=True),
)
self._telemetry_counter["get_online_features"] += 1
self._tele.log("get_online_features")
try:
response = self._serving_service.GetOnlineFeaturesV2(
GetOnlineFeaturesRequestV2(
Expand Down
27 changes: 27 additions & 0 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
RepoConfig,
load_repo_config,
)
from feast.telemetry import Telemetry
from feast.type_map import python_value_to_proto_value
from feast.version import get_version


class FeatureStore:
Expand Down Expand Up @@ -74,6 +76,12 @@ def __init__(
registry_path=registry_config.path,
cache_ttl=timedelta(seconds=registry_config.cache_ttl_seconds),
)
self._tele = Telemetry()

def version(self) -> str:
"""Returns the version of the current Feast SDK/CLI"""

return get_version()

@property
def project(self) -> str:
Expand All @@ -96,6 +104,7 @@ def refresh_registry(self):
greater than 0, then once the cache becomes stale (more time than the TTL has passed), a new cache will be
downloaded synchronously, which may increase latencies if the triggering method is get_online_features()
"""
self._tele.log("refresh_registry")

registry_config = self.config.get_registry_config()
self._registry = Registry(
Expand All @@ -111,6 +120,8 @@ def list_entities(self) -> List[Entity]:
Returns:
List of entities
"""
self._tele.log("list_entities")

return self._registry.list_entities(self.project)

def list_feature_views(self) -> List[FeatureView]:
Expand All @@ -120,6 +131,8 @@ def list_feature_views(self) -> List[FeatureView]:
Returns:
List of feature views
"""
self._tele.log("list_feature_views")

return self._registry.list_feature_views(self.project)

def get_entity(self, name: str) -> Entity:
Expand All @@ -133,6 +146,8 @@ def get_entity(self, name: str) -> Entity:
Returns either the specified entity, or raises an exception if
none is found
"""
self._tele.log("get_entity")

return self._registry.get_entity(name, self.project)

def get_feature_view(self, name: str) -> FeatureView:
Expand All @@ -146,6 +161,8 @@ def get_feature_view(self, name: str) -> FeatureView:
Returns either the specified feature view, or raises an exception if
none is found
"""
self._tele.log("get_feature_view")

return self._registry.get_feature_view(name, self.project)

def delete_feature_view(self, name: str):
Expand All @@ -155,6 +172,8 @@ def delete_feature_view(self, name: str):
Args:
name: Name of feature view
"""
self._tele.log("delete_feature_view")

return self._registry.delete_feature_view(name, self.project)

def apply(self, objects: List[Union[FeatureView, Entity]]):
Expand Down Expand Up @@ -186,6 +205,8 @@ def apply(self, objects: List[Union[FeatureView, Entity]]):
>>> fs.apply([customer_entity, customer_feature_view])
"""

self._tele.log("apply")

# TODO: Add locking
# TODO: Optimize by only making a single call (read/write)

Expand Down Expand Up @@ -246,6 +267,7 @@ def get_historical_features(
>>> feature_data = job.to_df()
>>> model.fit(feature_data) # insert your modeling framework here.
"""
self._tele.log("get_historical_features")

all_feature_views = self._registry.list_feature_views(
project=self.config.project
Expand Down Expand Up @@ -282,6 +304,8 @@ def materialize_incremental(
>>> fs = FeatureStore(config=RepoConfig(provider="gcp", registry="gs://my-fs/", project="my_fs_proj"))
>>> fs.materialize_incremental(end_date=datetime.utcnow() - timedelta(minutes=5))
"""
self._tele.log("materialize_incremental")

feature_views_to_materialize = []
if feature_views is None:
feature_views_to_materialize = self._registry.list_feature_views(
Expand Down Expand Up @@ -335,6 +359,8 @@ def materialize(
>>> start_date=datetime.utcnow() - timedelta(hours=3), end_date=datetime.utcnow() - timedelta(minutes=10)
>>> )
"""
self._tele.log("materialize")

feature_views_to_materialize = []
if feature_views is None:
feature_views_to_materialize = self._registry.list_feature_views(
Expand Down Expand Up @@ -424,6 +450,7 @@ def get_online_features(
>>> print(online_response_dict)
{'sales:daily_transactions': [1.1,1.2], 'sales:customer_id': [0,1]}
"""
self._tele.log("get_online_features")

response = self._get_online_features(
feature_refs=feature_refs,
Expand Down
77 changes: 57 additions & 20 deletions sdk/python/feast/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,32 +14,69 @@

import os
import sys
import uuid
from datetime import datetime
from typing import Dict
from os.path import expanduser, join

import requests

from feast.version import get_version

TELEMETRY_ENDPOINT = (
"https://us-central1-kf-feast.cloudfunctions.net/bq_telemetry_logger"
)


def log_usage(
function_name: str,
telemetry_id: str,
timestamp: datetime,
version: Dict[str, Dict[str, str]],
):
json = {
"function_name": function_name,
"telemetry_id": telemetry_id,
"timestamp": timestamp.isoformat(),
"version": version,
"os": sys.platform,
"is_test": os.getenv("FEAST_IS_TELEMETRY_TEST", "False"),
}
try:
requests.post(TELEMETRY_ENDPOINT, json=json)
except Exception:
pass
return
class Telemetry:
def __init__(self):
telemetry_filepath = join(expanduser("~"), ".feast", "telemetry")
self._telemetry_enabled = (
os.getenv("FEAST_TELEMETRY", default="True") == "True"
) # written this way to turn the env var string into a boolean

self._is_test = os.getenv("FEAST_IS_TELEMETRY_TEST", "False") == "True"

if self._telemetry_enabled:
self._telemetry_counter = {"get_online_features": 0}
if os.path.exists(telemetry_filepath):
with open(telemetry_filepath, "r") as f:
self._telemetry_id = f.read()
else:
self._telemetry_id = str(uuid.uuid4())
with open(telemetry_filepath, "w") as f:
f.write(self._telemetry_id)
print(
"Feast is an open source project that collects anonymized usage statistics. To opt out or learn"
" more see https://docs.feast.dev/v/master/feast-on-kubernetes/advanced-1/telemetry"
)

@property
def telemetry_id(self):
if os.environ["FEAST_FORCE_TELEMETRY_UUID"]:
return os.environ["FEAST_FORCE_TELEMETRY_UUID"]
return self._telemetry_id

def log(self, function_name: str):

if self._telemetry_enabled and self.telemetry_id:
if function_name == "get_online_features":
if self._telemetry_counter["get_online_features"] % 10000 != 0:
self._telemetry_counter["get_online_features"] += 1
return

json = {
"function_name": function_name,
"telemetry_id": self.telemetry_id,
"timestamp": datetime.utcnow().isoformat(),
"version": get_version(),
"os": sys.platform,
"is_test": self._is_test,
}
try:
requests.post(TELEMETRY_ENDPOINT, json=json)
except Exception as e:
if self._is_test:
raise e
else:
pass
return
13 changes: 13 additions & 0 deletions sdk/python/feast/version.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import pkg_resources


def get_version():
"""
Returns version information of the Feast Python Package
"""

try:
sdk_version = pkg_resources.get_distribution("feast").version
except pkg_resources.DistributionNotFound:
sdk_version = "unknown"
return sdk_version
1 change: 1 addition & 0 deletions sdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
"pytest-mock==1.10.4",
"Sphinx",
"sphinx-rtd-theme",
"tenacity",
"adlfs==0.5.9",
"firebase-admin==4.5.2",
"google-cloud-datastore==2.1.0",
Expand Down

0 comments on commit f371c2d

Please sign in to comment.