Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add server telemetry #1687

Merged
merged 31 commits into from Sep 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
b50b27d
docs: Add base documentation
dvsrepo Aug 20, 2022
6dfbf04
wip: define telemetry components
frascuchon Aug 23, 2022
c78a4e5
refactor: using client instance
frascuchon Aug 25, 2022
1d97e69
feat: track after login
frascuchon Aug 25, 2022
294c7b6
feat: track add records
frascuchon Aug 25, 2022
792a6b0
tests: disable send for now
frascuchon Aug 25, 2022
75b7ac9
chore: add missing dep
frascuchon Aug 25, 2022
fe0a134
fix: server id as string
frascuchon Aug 25, 2022
a717c27
tests: add tests
frascuchon Aug 25, 2022
a3be2e1
tests: track_data fixture
frascuchon Aug 25, 2022
6c83f33
fix: telemetry requires async
frascuchon Aug 25, 2022
715755b
tests: include more tests
frascuchon Aug 25, 2022
6d1f3f5
fix: prevent any import error
frascuchon Aug 25, 2022
fdbb9d6
tests: disable sending telemetry events for tests
frascuchon Aug 25, 2022
3fa7006
chore: adapt event names
frascuchon Aug 25, 2022
24b1bd8
chore: Normalize event naming
frascuchon Aug 25, 2022
69ecc53
chore: telemetry key as env var
frascuchon Aug 26, 2022
ac28e3d
docs: include fields and info tracked by the telemetry module.
frascuchon Aug 26, 2022
e241141
fix: keep old fashion env var names
frascuchon Aug 29, 2022
1a8684c
feat: include request info in telemetry events
frascuchon Aug 29, 2022
789a49a
perf: disable telemetry when host is not accesible
frascuchon Aug 29, 2022
931b604
docs: include tracked headers
frascuchon Aug 29, 2022
0fe656b
tests: conditional mock for telemetry client
frascuchon Aug 29, 2022
639352c
fix: disable ssl verify for telemetry backend check
frascuchon Sep 1, 2022
39408cd
feat: include user_hash in login events
frascuchon Sep 8, 2022
a690594
tests: try changes
frascuchon Sep 8, 2022
8d46b31
revert: prior commit
frascuchon Sep 8, 2022
434328a
tests: try to fix telemetry tests
frascuchon Sep 8, 2022
bfeae65
ci: force cache hit
frascuchon Sep 8, 2022
31258c2
chore: discard wrong analytics version
frascuchon Sep 8, 2022
9fdfc3e
ci: force cache conda hit
frascuchon Sep 8, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 5 additions & 5 deletions .github/workflows/package.yml
Expand Up @@ -55,22 +55,22 @@ jobs:
use-mamba: true
activate-environment: rubrix

- name: Set date for conda cache
- name: Get date for conda cache
if: steps.filter.outputs.python_code == 'true'
id: set-date
id: get-date
run: echo "::set-output name=today::$(/bin/date -u '+%Y%m%d')"
shell: bash

- name: Cache Conda env
if: steps.filter.outputs.python_code == 'true'
uses: actions/cache@v2
id: cache
with:
path: ${{ env.CONDA }}/envs
key: conda-${{ runner.os }}--${{ runner.arch }}--${{ steps.set-date.outputs.today }}-${{ hashFiles('environment_dev.yml') }}-${{ env.CACHE_NUMBER }}
key: conda-${{ runner.os }}--${{ runner.arch }}--${{ steps.get-date.outputs.today }}-${{ hashFiles('environment_dev.yml') }}-${{ env.CACHE_NUMBER }}
env:
# Increase this value to reset cache if etc/example-environment.yml has not changed
CACHE_NUMBER: 0
id: cache
CACHE_NUMBER: 2

- name: Update environment
if: steps.filter.outputs.python_code == 'true' && steps.cache.outputs.cache-hit != 'true'
Expand Down
54 changes: 54 additions & 0 deletions docs/community/telemetry.md
@@ -0,0 +1,54 @@
# Telemetry
Rubrix uses telemetry to report anonymous usage and error information. As an open-source software, this type of information is important to improve and understand how the product is used.

## How to opt-out
You can opt-out of telemetry reporting using the `ENV` variable `RUBRIX_ENABLE_TELEMETRY` before launching the server. Setting this variable to `0` will completely disable telemetry reporting.

If you are a Linux/MacOs users you should run:

```bash
export RUBRIX_ENABLE_TELEMETRY=0
```

If you are Windows users you should run:

```bash
set RUBRIX_ENABLE_TELEMETRY=0
```

To opt-in again, you can set the variable to `1`.

## Why reporting telemetry
Anonymous telemetry information enable us to continously improve the product and detect recurring problems to better serve all users. We collect aggregated information about general usage and errors. We do NOT collect any information of users' data records, datasets, or metadata information.

## Sensitive data
We do not collect any piece of information related to the source data you store in Rubrix. We don't identify individual users. Your data does not leave your server at any time:

* No dataset record is collected.
* No dataset names or metadata are collected.

## Information reported
The following usage and error information is reported:

* The code of the raised error
* The `user-agent` and `accept-language` http headers
* Task name and number of records for bulk operations
* An anonymous generated user uuid
* The rubrix version running the server
* The python version, e.g. `3.8.13`
* The system/OS name, such as `Linux`, `Darwin`, `Windows`
* The system’s release version, e.g. `Darwin Kernel Version 21.5.0: Tue Apr 26 21:08:22 PDT 2022; root:xnu-8020`
* The machine type, e.g. `AMD64`
* The underlying platform spec with as much useful information as possible. (ej. `macOS-10.16-x86_64-i386-64bit`)


This is performed by registering information from the following API methods:

* `/api/me`
* `/api/dataset/{name}/{task}:bulk`
* Raised server API errors


For transparency, you can inspect the source code where this is performed here (add link to the source).

If you have any doubts, don't hesitate to join our [Slack channel](https://join.slack.com/t/rubrixworkspace/shared_invite/zt-whigkyjn-a3IUJLD7gDbTZ0rKlvcJ5g) or open a GitHub issue. We'd be very happy to discuss about how we can improve this.
2 changes: 2 additions & 0 deletions pyproject.toml
Expand Up @@ -66,6 +66,8 @@ server = [
"passlib[bcrypt]~=1.7.4",
# Info status
"psutil ~= 5.8.0",
# Telemetry
"segment-analytics-python != 2.2.1"
]
listeners = [
"schedule ~= 1.1.0",
Expand Down
4 changes: 2 additions & 2 deletions src/rubrix/server/apis/v0/handlers/text2text.py
Expand Up @@ -69,7 +69,7 @@
response_model=BulkResponse,
response_model_exclude_none=True,
)
def bulk_records(
async def bulk_records(
name: str,
bulk: Text2TextBulkRequest,
common_params: CommonTaskHandlerDependencies = Depends(),
Expand Down Expand Up @@ -100,7 +100,7 @@ def bulk_records(
dataset.owner = owner
datasets.create_dataset(user=current_user, dataset=dataset)

result = service.add_records(
result = await service.add_records(
dataset=dataset,
records=[ServiceText2TextRecord.parse_obj(r) for r in bulk.records],
)
Expand Down
2 changes: 1 addition & 1 deletion src/rubrix/server/apis/v0/handlers/text_classification.py
Expand Up @@ -126,7 +126,7 @@ async def bulk_records(
user=current_user, dataset=dataset, records=records
)

result = service.add_records(
result = await service.add_records(
dataset=dataset,
records=records,
)
Expand Down
2 changes: 1 addition & 1 deletion src/rubrix/server/apis/v0/handlers/token_classification.py
Expand Up @@ -121,7 +121,7 @@ async def bulk_records(
records=records,
)

result = service.add_records(
result = await service.add_records(
dataset=dataset,
records=records,
)
Expand Down
16 changes: 10 additions & 6 deletions src/rubrix/server/apis/v0/handlers/users.py
Expand Up @@ -13,26 +13,28 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from fastapi import APIRouter, Security
from fastapi import APIRouter, Request, Security

from rubrix.server.commons import telemetry
from rubrix.server.security import auth
from rubrix.server.security.model import User

router = APIRouter(tags=["users"])


@router.get(
"/me",
response_model=User,
response_model_exclude_none=True,
operation_id="whoami",
"/me", response_model=User, response_model_exclude_none=True, operation_id="whoami"
)
async def whoami(current_user: User = Security(auth.get_user, scopes=[])):
async def whoami(
request: Request, current_user: User = Security(auth.get_user, scopes=[])
):
"""
User info endpoint

Parameters
----------
request:
The original request
current_user:
The current request user

Expand All @@ -41,4 +43,6 @@ async def whoami(current_user: User = Security(auth.get_user, scopes=[])):
The current user

"""

await telemetry.track_login(request, username=current_user.username)
return current_user
119 changes: 119 additions & 0 deletions src/rubrix/server/commons/telemetry.py
@@ -0,0 +1,119 @@
import dataclasses
import logging
import platform
import uuid
from typing import Any, Dict, Optional

import httpx
from fastapi import Request

from rubrix.server.commons.models import TaskType
from rubrix.server.errors.base_errors import RubrixServerError
from rubrix.server.settings import settings

try:
from analytics import Client
except ModuleNotFoundError:
# TODO: show some warning info
settings.enable_telemetry = False
Client = None


def _configure_analytics(disable_send: bool = False) -> Client:
API_KEY = settings.telemetry_key or "C6FkcaoCbt78rACAgvyBxGBcMB3dM3nn"
TELEMETRY_HOST = "https://api.segment.io"

# Check host connection
httpx.options(TELEMETRY_HOST, timeout=1, verify=False)

return Client(
write_key=API_KEY,
gzip=True,
host=TELEMETRY_HOST,
send=not disable_send,
max_retries=5,
)


@dataclasses.dataclass
class _TelemetryClient:

client: Client

__INSTANCE__: "_TelemetryClient" = None
__server_id__: Optional[uuid.UUID] = dataclasses.field(init=False, default=None)

@property
def server_id(self) -> uuid.UUID:
return self.__server_id__

@classmethod
def get(cls):
if settings.enable_telemetry:
if cls.__INSTANCE__ is None:
try:
cls.__INSTANCE__ = cls(client=_configure_analytics())
except Exception as err:
logging.getLogger(__name__).warning(
f"Cannot initialize telemetry. Error: {err}. Disabling..."
)
settings.enable_telemetry = False
return None
return cls.__INSTANCE__

def __post_init__(self):

from rubrix import __version__

self.__server_id__ = uuid.UUID(int=uuid.getnode())
self.__server_id_str__ = str(self.__server_id__)
self.__system_info__ = {
"system": platform.system(),
"machine": platform.machine(),
"platform": platform.platform(),
"python_version": platform.python_version(),
"sys_version": platform.version(),
"rubrix_version": __version__,
}

def track_data(
self, action: str, data: Dict[str, Any], include_system_info: bool = True
):
event_data = data.copy()
if include_system_info:
event_data.update(self.__system_info__)
self.client.track(self.__server_id_str__, action, event_data)


def _process_request_info(request: Request):
return {
header: request.headers.get(header)
for header in ["user-agent", "accept-language"]
}


async def track_error(error: RubrixServerError, request: Request):
client = _TelemetryClient.get()
if client:
client.track_data(
"ServerErrorFound", {"code": error.code, **_process_request_info(request)}
)


async def track_bulk(task: TaskType, records: int):
client = _TelemetryClient.get()
if client:
client.track_data("LogRecordsRequested", {"task": task, "records": records})


async def track_login(request: Request, username: str):
client = _TelemetryClient.get()
if client:
client.track_data(
"UserInfoRequested",
{
"is_default_user": username == "rubrix",
"user_hash": str(uuid.uuid5(namespace=client.server_id, name=username)),
**_process_request_info(request),
},
)
3 changes: 3 additions & 0 deletions src/rubrix/server/errors/api_errors.py
Expand Up @@ -5,6 +5,7 @@
from fastapi.exception_handlers import http_exception_handler
from pydantic import BaseModel

from rubrix.server.commons import telemetry
from rubrix.server.errors.adapter import exception_to_rubrix_error
from rubrix.server.errors.base_errors import RubrixServerError

Expand Down Expand Up @@ -40,6 +41,8 @@ class APIErrorHandler:
async def common_exception_handler(request: Request, error: Exception):
"""Wraps errors as custom generic error"""
rubrix_error = exception_to_rubrix_error(error)
await telemetry.track_error(rubrix_error, request=request)

return await http_exception_handler(
request, RubrixServerHTTPException(rubrix_error)
)
5 changes: 4 additions & 1 deletion src/rubrix/server/services/storage/service.py
Expand Up @@ -2,6 +2,7 @@

from fastapi import Depends

from rubrix.server.commons import telemetry
from rubrix.server.commons.config import TasksFactory
from rubrix.server.daos.records import DatasetRecordsDAO
from rubrix.server.services.datasets import ServiceDataset
Expand All @@ -24,13 +25,15 @@ def get_instance(
def __init__(self, dao: DatasetRecordsDAO):
self.__dao__ = dao

def store_records(
async def store_records(
self,
dataset: ServiceDataset,
records: List[ServiceRecord],
record_type: Type[ServiceRecord],
) -> int:
"""Store a set of records"""
await telemetry.track_bulk(task=dataset.task, records=len(records))

metrics = TasksFactory.get_task_metrics(dataset.task)
if metrics:
for record in records:
Expand Down
4 changes: 2 additions & 2 deletions src/rubrix/server/services/tasks/text2text/service.py
Expand Up @@ -60,12 +60,12 @@ def __init__(
self.__storage__ = storage
self.__search__ = search

def add_records(
async def add_records(
self,
dataset: ServiceText2TextDataset,
records: List[ServiceText2TextRecord],
):
failed = self.__storage__.store_records(
failed = await self.__storage__.store_records(
dataset=dataset,
records=records,
record_type=ServiceText2TextRecord,
Expand Down
Expand Up @@ -67,15 +67,15 @@ def __init__(
self.__search__ = search
self.__labeling__ = labeling

def add_records(
async def add_records(
self,
dataset: ServiceTextClassificationDataset,
records: List[ServiceTextClassificationRecord],
):
# TODO(@frascuchon): This will moved to dataset settings validation once DatasetSettings join the game!
self._check_multi_label_integrity(dataset, records)

failed = self.__storage__.store_records(
failed = await self.__storage__.store_records(
dataset=dataset,
records=records,
record_type=ServiceTextClassificationRecord,
Expand Down
Expand Up @@ -57,12 +57,12 @@ def __init__(
self.__storage__ = storage
self.__search__ = search

def add_records(
async def add_records(
self,
dataset: ServiceTokenClassificationDataset,
records: List[ServiceTokenClassificationRecord],
):
failed = self.__storage__.store_records(
failed = await self.__storage__.store_records(
dataset=dataset,
records=records,
record_type=ServiceTokenClassificationRecord,
Expand Down