Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions src/firebolt/async_db/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import socket
from json import JSONDecodeError
from types import TracebackType
from typing import Any, Callable, List, Optional, Type
from typing import Any, Callable, Dict, List, Optional, Type

from httpcore.backends.auto import AutoBackend
from httpcore.backends.base import AsyncNetworkStream
Expand All @@ -24,6 +24,7 @@
ACCOUNT_ENGINE_URL,
ACCOUNT_ENGINE_URL_BY_DATABASE_NAME,
)
from firebolt.utils.usage_tracker import get_user_agent_header
from firebolt.utils.util import fix_url_schema

DEFAULT_TIMEOUT_SECONDS: int = 5
Expand Down Expand Up @@ -166,6 +167,7 @@ async def connect_inner(
account_name: Optional[str] = None,
api_endpoint: str = DEFAULT_API_URL,
use_token_cache: bool = True,
additional_parameters: Dict[str, Any] = {},
) -> Connection:
"""Connect to Firebolt database.

Expand All @@ -183,6 +185,8 @@ async def connect_inner(
api_endpoint (str): Firebolt API endpoint. Used for authentication.
use_token_cache (bool): Cached authentication token in filesystem.
Default: True
additional_parameters (Optional[Dict]): Dictionary of less widely-used
arguments for connection.

Note:
Providing both `engine_name` and `engine_url` would result in an error.
Expand Down Expand Up @@ -238,7 +242,9 @@ async def connect_inner(
assert engine_url is not None

engine_url = fix_url_schema(engine_url)
return connection_class(engine_url, database, auth, api_endpoint)
return connection_class(
engine_url, database, auth, api_endpoint, additional_parameters
)

return connect_inner

Expand Down Expand Up @@ -297,17 +303,19 @@ def __init__(
database: str,
auth: Auth,
api_endpoint: str = DEFAULT_API_URL,
additional_parameters: Dict[str, Any] = {},
):
# Override tcp keepalive settings for connection
transport = AsyncHTTPTransport()
transport._pool._network_backend = OverriddenHttpBackend()

connector_versions = additional_parameters.get("connector_versions", [])
self._client = AsyncClient(
auth=auth,
base_url=engine_url,
api_endpoint=api_endpoint,
timeout=Timeout(DEFAULT_TIMEOUT_SECONDS, read=None),
transport=transport,
headers={"User-Agent": get_user_agent_header(connector_versions)},
)
self.api_endpoint = api_endpoint
self.engine_url = engine_url
Expand Down Expand Up @@ -372,6 +380,9 @@ class Connection(BaseConnection):
username: Firebolt account username
password: Firebolt account password
api_endpoint: Optional. Firebolt API endpoint. Used for authentication.
connector_versions: Optional. Tuple of connector name and version or
list of tuples of your connector stack. Useful for tracking custom
connector usage.

Note:
Firebolt currenly doesn't support transactions
Expand Down
159 changes: 159 additions & 0 deletions src/firebolt/utils/usage_tracker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
import inspect
import logging
from importlib import import_module
from pathlib import Path
from platform import python_version, release, system
from sys import modules
from typing import Dict, List, Optional, Tuple

from pydantic import BaseModel

from firebolt import __version__


class ConnectorVersions(BaseModel):
"""
Verify correct parameter types
"""

versions: List[Tuple[str, str]]


logger = logging.getLogger(__name__)


CONNECTOR_MAP = [
(
"DBT",
"open",
Path("dbt/adapters/firebolt/connections.py"),
"dbt.adapters.firebolt",
),
(
"Airflow",
"get_conn",
Path("firebolt_provider/hooks/firebolt.py"),
"firebolt_provider",
),
(
"AirbyteDestination",
"establish_connection",
Path("destination_firebolt/destination.py"),
"",
),
(
"AirbyteDestination",
"establish_async_connection",
Path("destination_firebolt/destination.py"),
"",
),
("AirbyteSource", "establish_connection", Path("source_firebolt/source.py"), ""),
(
"AirbyteSource",
"establish_async_connection",
Path("source_firebolt/source.py"),
"",
),
("SQLAlchemy", "connect", Path("sqlalchemy/engine/default.py"), "firebolt_db"),
("FireboltCLI", "create_connection", Path("firebolt_cli/utils.py"), "firebolt_cli"),
]


def _os_compare(file: Path, expected: Path) -> bool:
"""
System-independent path comparison.

Args:
file: file path to check against
expected: expected file path

Returns:
True if file ends with path
"""
return file.parts[-len(expected.parts) :] == expected.parts


def get_sdk_properties() -> Tuple[str, str, str, str]:
"""
Detect Python, OS and SDK versions.

Returns:
Python version, SDK version, OS name and "ciso" if imported
"""
py_version = python_version()
sdk_version = __version__
os_version = f"{system()} {release()}"
ciso = "ciso8601" if "ciso8601" in modules.keys() else ""
logger.debug(
"Python %s detected. SDK %s OS %s %s",
py_version,
sdk_version,
os_version,
ciso,
)
return (py_version, sdk_version, os_version, ciso)


def detect_connectors() -> Dict[str, str]:
"""
Detect which connectors are running the code by parsing the stack.
Exceptions are ignored since this is intended for logging only.
"""
connectors: Dict[str, str] = {}
stack = inspect.stack()
for f in stack:
try:
for name, func, path, version_path in CONNECTOR_MAP:
if f.function == func and _os_compare(Path(f.filename), path):
if version_path:
m = import_module(version_path)
connectors[name] = m.__version__ # type: ignore
else:
# Some connectors don't have versions specified
connectors[name] = ""
# No need to carry on if connector is detected
break
except Exception:
logger.debug(
"Failed to extract version from %s in %s", f.function, f.filename
)
return connectors


def format_as_user_agent(connectors: Dict[str, str]) -> str:
"""
Return a representation of a stored tracking data as a user-agent header.

Args:
connectors: Dictionary of connector to version mappings

Returns:
String of the current detected connector stack.
"""
py, sdk, os, ciso = get_sdk_properties()
sdk_format = f"PythonSDK/{sdk} (Python {py}; {os}; {ciso})"
connector_format = "".join(
[f" {connector}/{version}" for connector, version in connectors.items()]
)
return sdk_format + connector_format


def get_user_agent_header(
connector_versions: Optional[List[Tuple[str, str]]] = []
) -> str:
"""
Return a user agent header with connector stack and system information.

Args:
connector_versions(Optional): User-supplied list of tuples of all connectors
and their versions intended for tracking.

Returns:
String representation of a user-agent tracking information
"""
connectors = detect_connectors()
logger.debug("Detected running from packages: %s", str(connectors))
# Override auto-detected connectors with info provided manually
for name, version in ConnectorVersions(versions=connector_versions).versions:
connectors[name] = version
return format_as_user_agent(connectors)
15 changes: 15 additions & 0 deletions tests/integration/utils/sample_usage.model
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import sys

# Hack to avoid detecting current file as firebolt module
old_path = sys.path
sys.path = sys.path[1:]
from firebolt.utils.usage_tracker import get_user_agent_header

# Back to old path for detection to work properly
sys.path = old_path


def {function_name}():
print(get_user_agent_header())

{function_name}()
77 changes: 77 additions & 0 deletions tests/integration/utils/test_usage_tracker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import os
from pathlib import Path
from shutil import rmtree
from subprocess import PIPE, run

from pytest import fixture, mark

TEST_FOLDER = "tmp_test_code/"
TEST_SCRIPT_MODEL = "tests/integration/utils/sample_usage.model"


MOCK_MODULES = [
"firebolt_cli/firebolt_cli.py",
"sqlalchemy/engine/firebolt_db.py",
"firebolt_provider/hooks/firebolt_provider.py",
"dbt/adapters/firebolt/dbt/adapters/firebolt.py",
]


@fixture(scope="module", autouse=True)
def create_cli_mock():
for i, file in enumerate(MOCK_MODULES):
os.makedirs(os.path.dirname(f"{TEST_FOLDER}{file}"))
with open(f"{TEST_FOLDER}{file}", "w") as f:
f.write(f"__version__ = '1.0.{i}'")
# Additional setup for proper dbt import
Path(f"{TEST_FOLDER}dbt/adapters/firebolt/dbt/__init__.py").touch()
Path(f"{TEST_FOLDER}/dbt/adapters/firebolt/dbt/adapters/__init__.py").touch()
yield
rmtree(TEST_FOLDER)


@fixture(scope="module")
def test_model():
with open(TEST_SCRIPT_MODEL) as f:
return f.read()


def create_test_file(code: str, function_name: str, file_path: str):
code = code.format(function_name=function_name)
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, "w") as f:
f.write(code)


@mark.parametrize(
"function,path,expected",
[
("create_connection", "firebolt_cli/utils.py", "FireboltCLI/1.0.0"),
("connect", "sqlalchemy/engine/default.py", "SQLAlchemy/1.0.1"),
("establish_connection", "source_firebolt/source.py", "AirbyteSource/"),
("establish_async_connection", "source_firebolt/source.py", "AirbyteSource/"),
(
"establish_connection",
"destination_firebolt/destination.py",
"AirbyteDestination/",
),
(
"establish_async_connection",
"destination_firebolt/destination.py",
"AirbyteDestination/",
),
("get_conn", "firebolt_provider/hooks/firebolt.py", "Airflow/1.0.2"),
("open", "dbt/adapters/firebolt/connections.py", "DBT/1.0.3"),
],
)
def test_usage_detection(function, path, expected, test_model):
test_path = TEST_FOLDER + path
create_test_file(test_model, function, test_path)
result = run(
["python3", test_path],
stdout=PIPE,
stderr=PIPE,
env={"PYTHONPATH": os.getenv("PYTHONPATH", ""), "PATH": os.getenv("PATH", "")},
)
assert not result.stderr
assert expected in result.stdout.decode("utf-8")
Loading