Skip to content

Commit

Permalink
feat(Server): Add server telemetry (#1687)
Browse files Browse the repository at this point in the history
(cherry picked from commit d57afd1)

Some changes related to the usage telemetry:

- Passing system data as event context (will simplify later integrations)
- Show warning message on server start to clear preventing users
- Include the Telemetry section inside the Reference doc.
  • Loading branch information
dvsrepo authored and frascuchon committed Oct 5, 2022
1 parent 9613742 commit 106a8bd
Show file tree
Hide file tree
Showing 21 changed files with 338 additions and 45 deletions.
10 changes: 5 additions & 5 deletions .github/workflows/package.yml
Expand Up @@ -55,22 +55,22 @@ jobs:
use-mamba: true
activate-environment: rubrix

- name: Set date for conda cache
- name: Get date for conda cache
if: steps.filter.outputs.python_code == 'true'
id: set-date
id: get-date
run: echo "::set-output name=today::$(/bin/date -u '+%Y%m%d')"
shell: bash

- name: Cache Conda env
if: steps.filter.outputs.python_code == 'true'
uses: actions/cache@v2
id: cache
with:
path: ${{ env.CONDA }}/envs
key: conda-${{ runner.os }}--${{ runner.arch }}--${{ steps.set-date.outputs.today }}-${{ hashFiles('environment_dev.yml') }}-${{ env.CACHE_NUMBER }}
key: conda-${{ runner.os }}--${{ runner.arch }}--${{ steps.get-date.outputs.today }}-${{ hashFiles('environment_dev.yml') }}-${{ env.CACHE_NUMBER }}
env:
# Increase this value to reset cache if etc/example-environment.yml has not changed
CACHE_NUMBER: 0
id: cache
CACHE_NUMBER: 2

- name: Update environment
if: steps.filter.outputs.python_code == 'true' && steps.cache.outputs.cache-hit != 'true'
Expand Down
1 change: 1 addition & 0 deletions docs/index.md
Expand Up @@ -172,6 +172,7 @@ You can join the conversation on Slack! We are a very friendly and inclusive com
:caption: Reference
:hidden:

reference/telemetry
reference/python/index
reference/webapp/index

Expand Down
54 changes: 54 additions & 0 deletions docs/reference/telemetry.md
@@ -0,0 +1,54 @@
# Telemetry
Rubrix uses telemetry to report anonymous usage and error information. As an open-source software, this type of information is important to improve and understand how the product is used.

## How to opt-out
You can opt-out of telemetry reporting using the `ENV` variable `RUBRIX_ENABLE_TELEMETRY` before launching the server. Setting this variable to `0` will completely disable telemetry reporting.

If you are a Linux/MacOs users you should run:

```bash
export RUBRIX_ENABLE_TELEMETRY=0
```

If you are Windows users you should run:

```bash
set RUBRIX_ENABLE_TELEMETRY=0
```

To opt-in again, you can set the variable to `1`.

## Why reporting telemetry
Anonymous telemetry information enable us to continously improve the product and detect recurring problems to better serve all users. We collect aggregated information about general usage and errors. We do NOT collect any information of users' data records, datasets, or metadata information.

## Sensitive data
We do not collect any piece of information related to the source data you store in Rubrix. We don't identify individual users. Your data does not leave your server at any time:

* No dataset record is collected.
* No dataset names or metadata are collected.

## Information reported
The following usage and error information is reported:

* The code of the raised error
* The `user-agent` and `accept-language` http headers
* Task name and number of records for bulk operations
* An anonymous generated user uuid
* The rubrix version running the server
* The python version, e.g. `3.8.13`
* The system/OS name, such as `Linux`, `Darwin`, `Windows`
* The system’s release version, e.g. `Darwin Kernel Version 21.5.0: Tue Apr 26 21:08:22 PDT 2022; root:xnu-8020`
* The machine type, e.g. `AMD64`
* The underlying platform spec with as much useful information as possible. (ej. `macOS-10.16-x86_64-i386-64bit`)


This is performed by registering information from the following API methods:

* `/api/me`
* `/api/dataset/{name}/{task}:bulk`
* Raised server API errors


For transparency, you can inspect the source code where this is performed here (add link to the source).

If you have any doubts, don't hesitate to join our [Slack channel](https://join.slack.com/t/rubrixworkspace/shared_invite/zt-whigkyjn-a3IUJLD7gDbTZ0rKlvcJ5g) or open a GitHub issue. We'd be very happy to discuss about how we can improve this.
2 changes: 2 additions & 0 deletions pyproject.toml
Expand Up @@ -66,6 +66,8 @@ server = [
"passlib[bcrypt]~=1.7.4",
# Info status
"psutil ~= 5.8.0",
# Telemetry
"segment-analytics-python != 2.2.1"
]
listeners = [
"schedule ~= 1.1.0",
Expand Down
4 changes: 2 additions & 2 deletions src/rubrix/server/apis/v0/handlers/text2text.py
Expand Up @@ -69,7 +69,7 @@
response_model=BulkResponse,
response_model_exclude_none=True,
)
def bulk_records(
async def bulk_records(
name: str,
bulk: Text2TextBulkRequest,
common_params: CommonTaskHandlerDependencies = Depends(),
Expand Down Expand Up @@ -100,7 +100,7 @@ def bulk_records(
dataset.owner = owner
datasets.create_dataset(user=current_user, dataset=dataset)

result = service.add_records(
result = await service.add_records(
dataset=dataset,
records=[ServiceText2TextRecord.parse_obj(r) for r in bulk.records],
)
Expand Down
2 changes: 1 addition & 1 deletion src/rubrix/server/apis/v0/handlers/text_classification.py
Expand Up @@ -126,7 +126,7 @@ async def bulk_records(
user=current_user, dataset=dataset, records=records
)

result = service.add_records(
result = await service.add_records(
dataset=dataset,
records=records,
)
Expand Down
2 changes: 1 addition & 1 deletion src/rubrix/server/apis/v0/handlers/token_classification.py
Expand Up @@ -121,7 +121,7 @@ async def bulk_records(
records=records,
)

result = service.add_records(
result = await service.add_records(
dataset=dataset,
records=records,
)
Expand Down
16 changes: 10 additions & 6 deletions src/rubrix/server/apis/v0/handlers/users.py
Expand Up @@ -13,26 +13,28 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from fastapi import APIRouter, Security
from fastapi import APIRouter, Request, Security

from rubrix.server.commons import telemetry
from rubrix.server.security import auth
from rubrix.server.security.model import User

router = APIRouter(tags=["users"])


@router.get(
"/me",
response_model=User,
response_model_exclude_none=True,
operation_id="whoami",
"/me", response_model=User, response_model_exclude_none=True, operation_id="whoami"
)
async def whoami(current_user: User = Security(auth.get_user, scopes=[])):
async def whoami(
request: Request, current_user: User = Security(auth.get_user, scopes=[])
):
"""
User info endpoint
Parameters
----------
request:
The original request
current_user:
The current request user
Expand All @@ -41,4 +43,6 @@ async def whoami(current_user: User = Security(auth.get_user, scopes=[])):
The current user
"""

await telemetry.track_login(request, username=current_user.username)
return current_user
122 changes: 122 additions & 0 deletions src/rubrix/server/commons/telemetry.py
@@ -0,0 +1,122 @@
import dataclasses
import logging
import platform
import uuid
from typing import Any, Dict, Optional

import httpx
from fastapi import Request

from rubrix.server.commons.models import TaskType
from rubrix.server.errors.base_errors import RubrixServerError
from rubrix.server.settings import settings

try:
from analytics import Client
except ModuleNotFoundError:
# TODO: show some warning info
settings.enable_telemetry = False
Client = None


def _configure_analytics(disable_send: bool = False) -> Client:
API_KEY = settings.telemetry_key or "C6FkcaoCbt78rACAgvyBxGBcMB3dM3nn"
TELEMETRY_HOST = "https://api.segment.io"

# Check host connection
httpx.options(TELEMETRY_HOST, timeout=1, verify=False)

return Client(
write_key=API_KEY,
gzip=True,
host=TELEMETRY_HOST,
send=not disable_send,
max_retries=5,
)


@dataclasses.dataclass
class _TelemetryClient:

client: Client

__INSTANCE__: "_TelemetryClient" = None
__server_id__: Optional[uuid.UUID] = dataclasses.field(init=False, default=None)

@property
def server_id(self) -> uuid.UUID:
return self.__server_id__

@classmethod
def get(cls):
if settings.enable_telemetry:
if cls.__INSTANCE__ is None:
try:
cls.__INSTANCE__ = cls(client=_configure_analytics())
except Exception as err:
logging.getLogger(__name__).warning(
f"Cannot initialize telemetry. Error: {err}. Disabling..."
)
settings.enable_telemetry = False
return None
return cls.__INSTANCE__

def __post_init__(self):

from rubrix import __version__

self.__server_id__ = uuid.UUID(int=uuid.getnode())
self.__server_id_str__ = str(self.__server_id__)
self.__system_info__ = {
"system": platform.system(),
"machine": platform.machine(),
"platform": platform.platform(),
"python_version": platform.python_version(),
"sys_version": platform.version(),
"version": __version__,
}

def track_data(
self, action: str, data: Dict[str, Any], include_system_info: bool = True
):
event_data = data.copy()
self.client.track(
user_id=self.__server_id_str__,
event=action,
properties=event_data,
context=self.__system_info__ if include_system_info else {},
)


def _process_request_info(request: Request):
return {
header: request.headers.get(header)
for header in ["user-agent", "accept-language"]
}


async def track_error(error: RubrixServerError, request: Request):
client = _TelemetryClient.get()
if client:
client.track_data(
"ServerErrorFound", {"code": error.code, **_process_request_info(request)}
)


async def track_bulk(task: TaskType, records: int):
client = _TelemetryClient.get()
if client:
client.track_data("LogRecordsRequested", {"task": task, "records": records})


async def track_login(request: Request, username: str):
client = _TelemetryClient.get()
if client:
client.track_data(
"UserInfoRequested",
{
"is_default_user": username == "rubrix",
"user_hash": str(uuid.uuid5(namespace=client.server_id, name=username)),
**_process_request_info(request),
},
)
3 changes: 3 additions & 0 deletions src/rubrix/server/errors/api_errors.py
Expand Up @@ -5,6 +5,7 @@
from fastapi.exception_handlers import http_exception_handler
from pydantic import BaseModel

from rubrix.server.commons import telemetry
from rubrix.server.errors.adapter import exception_to_rubrix_error
from rubrix.server.errors.base_errors import RubrixServerError

Expand Down Expand Up @@ -40,6 +41,8 @@ class APIErrorHandler:
async def common_exception_handler(request: Request, error: Exception):
"""Wraps errors as custom generic error"""
rubrix_error = exception_to_rubrix_error(error)
await telemetry.track_error(rubrix_error, request=request)

return await http_exception_handler(
request, RubrixServerHTTPException(rubrix_error)
)
33 changes: 33 additions & 0 deletions src/rubrix/server/server.py
Expand Up @@ -16,7 +16,10 @@
"""
This module configures the global fastapi application
"""
import inspect
import os
import sys
import warnings
from pathlib import Path

from brotli_asgi import BrotliMiddleware
Expand Down Expand Up @@ -127,6 +130,35 @@ def configure_app_logging(app: FastAPI):
version=str(rubrix_version),
)


def configure_telemetry(app):
message = "\n"
message += inspect.cleandoc(
"""
Rubrix uses telemetry to report anonymous usage and error information.
You can know more about what information is reported at:
https://rubrix.readthedocs.io/en/stable/reference/telemetry.html
Telemetry is currently enabled. If you want to disable it, you can configure
the environment variable before relaunching the server:
"""
)
message += "\n\n "
message += (
"#set RUBRIX_ENABLE_TELEMETRY=0"
if os.name == "nt"
else "$>export RUBRIX_ENABLE_TELEMETRY=0"
)
message += "\n"

@app.on_event("startup")
async def check_telemetry():
if settings.enable_telemetry:
print(message, flush=True)


for app_configure in [
configure_app_logging,
configure_middleware,
Expand All @@ -135,5 +167,6 @@ def configure_app_logging(app: FastAPI):
configure_api_router,
configure_app_statics,
configure_app_storage,
configure_telemetry,
]:
app_configure(app)
5 changes: 4 additions & 1 deletion src/rubrix/server/services/storage/service.py
Expand Up @@ -2,6 +2,7 @@

from fastapi import Depends

from rubrix.server.commons import telemetry
from rubrix.server.commons.config import TasksFactory
from rubrix.server.daos.records import DatasetRecordsDAO
from rubrix.server.services.datasets import ServiceDataset
Expand All @@ -24,13 +25,15 @@ def get_instance(
def __init__(self, dao: DatasetRecordsDAO):
self.__dao__ = dao

def store_records(
async def store_records(
self,
dataset: ServiceDataset,
records: List[ServiceRecord],
record_type: Type[ServiceRecord],
) -> int:
"""Store a set of records"""
await telemetry.track_bulk(task=dataset.task, records=len(records))

metrics = TasksFactory.get_task_metrics(dataset.task)
if metrics:
for record in records:
Expand Down

0 comments on commit 106a8bd

Please sign in to comment.