Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions packages/cubejs-backend-native/js/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ export const shutdownInterface = async (instance: SqlInterfaceInstance): Promise

export interface PyConfiguration {
repositoryFactory?: (ctx: unknown) => Promise<unknown>,
logger?: (msg: string, params: Record<string, any>) => void,
checkAuth?: (req: unknown, authorization: string) => Promise<void>
queryRewrite?: (query: unknown, ctx: unknown) => Promise<unknown>
contextToApiScopes?: () => Promise<string[]>
Expand Down Expand Up @@ -348,6 +349,15 @@ export const pythonLoadConfig = async (content: string, options: { fileName: str
});
}

if (config.logger) {
const nativeLogger = config.logger;
config.logger = (msg: string, params: Record<string, any>) => {
nativeLogger(msg, params).catch((e: any) => {
console.error(e);
});
};
}

return config;
};

Expand Down
149 changes: 54 additions & 95 deletions packages/cubejs-backend-native/python/cube/src/conf/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import os
from typing import Union, Callable, Dict
from typing import Union, Callable, Dict, Any


def file_repository(path):
Expand Down Expand Up @@ -31,22 +31,39 @@ class RequestContext:


class Configuration:
web_sockets: bool
http: Dict
graceful_shutdown: int
process_subscriptions_interval: int
web_sockets_base_path: str
schema_path: str
base_path: str
web_sockets_base_path: str
compiler_cache_size: int
telemetry: bool
pg_sql_port: int
dev_server: bool
api_secret: str
cache_and_queue_driver: str
allow_js_duplicate_props_in_schema: bool
process_subscriptions_interval: int
http: Dict
jwt: Dict
scheduled_refresh_timer: Any
scheduled_refresh_timezones: list[str]
scheduled_refresh_concurrency: int
scheduled_refresh_batch_size: int
compiler_cache_size: int
update_compiler_cache_keep_alive: bool
max_compiler_cache_keep_alive: int
telemetry: bool
sql_cache: bool
live_preview: bool
# SQL API
pg_sql_port: int
sql_super_user: str
sql_user: str
sql_password: str
# Functions
logger: Callable
context_to_app_id: Union[str, Callable[[RequestContext], str]]
context_to_orchestrator_id: Union[str, Callable[[RequestContext], str]]
driver_factory: Callable
driver_factory: Callable[[RequestContext], Dict]
external_driver_factory: Callable[[RequestContext], Dict]
db_type: Union[str, Callable[[RequestContext], str]]
check_auth: Callable
check_sql_auth: Callable
Expand All @@ -61,22 +78,38 @@ class Configuration:
orchestrator_options: Union[Dict, Callable[[RequestContext], Dict]]

def __init__(self):
self.web_sockets = None
self.http = None
self.graceful_shutdown = None
self.schema_path = None
self.base_path = None
self.dev_server = None
self.api_secret = None
self.web_sockets_base_path = None
self.compiler_cache_size = None
self.telemetry = None
self.pg_sql_port = None
self.cache_and_queue_driver = None
self.allow_js_duplicate_props_in_schema = None
self.process_subscriptions_interval = None
self.http = None
self.jwt = None
self.scheduled_refresh_timer = None
self.scheduled_refresh_timezones = None
self.scheduled_refresh_concurrency = None
self.scheduled_refresh_batch_size = None
self.compiler_cache_size = None
self.update_compiler_cache_keep_alive = None
self.max_compiler_cache_keep_alive = None
self.telemetry = None
self.sql_cache = None
self.live_preview = None
self.sql_super_user = None
self.sql_user = None
self.sql_password = None
# Functions
self.logger = None
self.context_to_app_id = None
self.context_to_orchestrator_id = None
self.driver_factory = None
self.external_driver_factory = None
self.db_type = None
self.check_auth = None
self.check_sql_auth = None
Expand All @@ -91,89 +124,15 @@ def __init__(self):
self.pre_aggregations_schema = None
self.orchestrator_options = None

def set_schema_path(self, schema_path: str):
self.schema_path = schema_path

def set_base_path(self, base_path: str):
self.base_path = base_path

def set_web_sockets_base_path(self, web_sockets_base_path: str):
self.web_sockets_base_path = web_sockets_base_path

def set_compiler_cache_size(self, compiler_cache_size: int):
self.compiler_cache_size = compiler_cache_size

def set_telemetry(self, telemetry: bool):
self.telemetry = telemetry

def set_pg_sql_port(self, pg_sql_port: int):
self.pg_sql_port = pg_sql_port

def set_cache_and_queue_driver(self, cache_and_queue_driver: str):
self.cache_and_queue_driver = cache_and_queue_driver

def set_allow_js_duplicate_props_in_schema(self, allow_js_duplicate_props_in_schema: bool):
self.allow_js_duplicate_props_in_schema = allow_js_duplicate_props_in_schema

def set_process_subscriptions_interval(self, process_subscriptions_interval: int):
self.process_subscriptions_interval = process_subscriptions_interval

def set_logger(self, logger: Callable):
self.logger = logger

def set_context_to_app_id(self, context_to_app_id: Union[str, Callable[[RequestContext], str]]):
self.context_to_app_id = context_to_app_id

def set_context_to_orchestrator_id(self, context_to_orchestrator_id: Union[str, Callable[[RequestContext], str]]):
self.context_to_orchestrator_id = context_to_orchestrator_id

def set_driver_factory(self, driver_factory: Callable):
self.driver_factory = driver_factory

def set_db_type(self, db_type: Union[str, Callable[[RequestContext], str]]):
self.db_type = db_type

def set_check_auth(self, check_auth: Callable):
self.check_auth = check_auth

def set_check_sql_auth(self, check_sql_auth: Callable):
self.check_sql_auth = check_sql_auth

def set_can_switch_sql_user(self, can_switch_sql_user: Callable):
self.can_switch_sql_user = can_switch_sql_user

def set_query_rewrite(self, query_rewrite: Callable):
self.query_rewrite = query_rewrite

def set_extend_context(self, extend_context: Callable[[RequestContext], Dict]):
self.extend_context = extend_context

def set_scheduled_refresh_contexts(self, scheduled_refresh_contexts: Callable):
self.scheduled_refresh_contexts = scheduled_refresh_contexts

def set_repository_factory(self, repository_factory: Callable):
self.repository_factory = repository_factory

def set_schema_version(self, schema_version: Union[str, Callable[[RequestContext], str]]):
self.schema_version = schema_version

def set_semantic_layer_sync(self, semantic_layer_sync: Union[Dict, Callable[[], Dict]]):
self.semantic_layer_sync = semantic_layer_sync

def set_pre_aggregations_schema(self, pre_aggregations_schema: Union[str, Callable[[RequestContext], str]]):
self.pre_aggregations_schema = pre_aggregations_schema

def set_orchestrator_options(self, orchestrator_options: Union[Dict, Callable[[RequestContext], Dict]]):
self.orchestrator_options = orchestrator_options


settings = Configuration()
def __call__(self, func):
if not callable(func):
raise ConfigurationException("@config decorator must be used with functions, actual: '%s'" % type(func).__name__)

def config(func):
if not callable(func):
raise ConfigurationException("@config decorator must be used with functions, actual: '%s'" % type(func).__name__)
if hasattr(self, func.__name__):
setattr(self, func.__name__, func)
else:
raise ConfigurationException("Unknown configuration property: '%s'" % func.__name__)

if hasattr(settings, func.__name__):
setattr(settings, func.__name__, func)
else:
raise ConfigurationException("Unknown settings property: '%s'" % func.__name__)
config = Configuration()
# backward compatibility
settings = config
70 changes: 41 additions & 29 deletions packages/cubejs-backend-native/src/python/cube_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,44 +15,56 @@ impl CubeConfigPy {
}
}

pub fn get_static_attrs(&self) -> Vec<&'static str> {
pub fn get_attrs(&self) -> Vec<&'static str> {
vec![
"web_sockets",
"http",
"graceful_shutdown",
"process_subscriptions_interval",
"web_sockets_base_path",
"schema_path",
"base_path",
"web_sockets_base_path",
"compiler_cache_size",
"telemetry",
"pg_sql_port",
"dev_server",
"api_secret",
"cache_and_queue_driver",
"allow_js_duplicate_props_in_schema",
"process_subscriptions_interval",
"http",
"jwt",
"scheduled_refresh_timer",
"scheduled_refresh_timezones",
"scheduled_refresh_concurrency",
"scheduled_refresh_batch_size",
"compiler_cache_size",
"update_compiler_cache_keep_alive",
"max_compiler_cache_keep_alive",
"telemetry",
"sql_cache",
"live_preview",
"pg_sql_port",
"sql_super_user",
"sql_user",
"sql_password",
// functions
"logger",
"context_to_app_id",
"context_to_orchestrator_id",
"driver_factory",
"external_driver_factory",
"db_type",
"check_auth",
"check_sql_auth",
"can_switch_sql_user",
"query_rewrite",
"extend_context",
"scheduled_refresh_contexts",
"context_to_api_scopes",
"repository_factory",
"semantic_layer_sync",
"schema_version",
"pre_aggregations_schema",
"orchestrator_options",
]
}

pub fn apply_dynamic_functions(&mut self, config_module: &PyAny) -> PyResult<()> {
self.attr(config_module, "logger")?;
self.attr(config_module, "context_to_app_id")?;
self.attr(config_module, "context_to_orchestrator_id")?;
self.attr(config_module, "driver_factory")?;
self.attr(config_module, "db_type")?;
self.attr(config_module, "check_auth")?;
self.attr(config_module, "check_sql_auth")?;
self.attr(config_module, "can_switch_sql_user")?;
self.attr(config_module, "query_rewrite")?;
self.attr(config_module, "extend_context")?;
self.attr(config_module, "scheduled_refresh_contexts")?;
self.attr(config_module, "context_to_api_scopes")?;
self.attr(config_module, "repository_factory")?;
self.attr(config_module, "semantic_layer_sync")?;
self.attr(config_module, "schema_version")?;
self.attr(config_module, "pre_aggregations_schema")?;
self.attr(config_module, "orchestrator_options")?;

Ok(())
}

pub fn attr(&mut self, config_module: &PyAny, key: &str) -> PyResult<()> {
let v = config_module.getattr(&*key)?;
if !v.is_none() {
Expand Down
7 changes: 4 additions & 3 deletions packages/cubejs-backend-native/src/python/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,19 @@ fn python_load_config(mut cx: FunctionContext) -> JsResult<JsPromise> {
let settings_py = if config_module.hasattr("__execution_context_locals")? {
let execution_context_locals = config_module.getattr("__execution_context_locals")?;
execution_context_locals.get_item("settings")?
} else if config_module.hasattr("config")? {
config_module.getattr("config")?
} else {
// backward compatibility
config_module.getattr("settings")?
};

let mut cube_conf = CubeConfigPy::new();

for attr_name in cube_conf.get_static_attrs() {
for attr_name in cube_conf.get_attrs() {
cube_conf.attr(settings_py, attr_name)?;
}

cube_conf.apply_dynamic_functions(settings_py)?;

Ok(cube_conf)
});

Expand Down
23 changes: 19 additions & 4 deletions packages/cubejs-backend-native/test/config.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
from cube.conf import (
config,
settings,
file_repository
)

settings.schema_path = "models"
settings.pg_sql_port = 5555
settings.telemetry = False
config.schema_path = "models"
config.pg_sql_port = 5555
config.telemetry = False

@config
def query_rewrite(query, ctx):
Expand All @@ -27,3 +26,19 @@ async def repository_factory(ctx):
async def context_to_api_scopes():
print('[python] context_to_api_scopes')
return ['meta', 'data', 'jobs']

@config
def schema_version(ctx):
print('[python] schema_version', ctx)

return '1'

@config
def pre_aggregations_schema(ctx):
print('[python] pre_aggregations_schema', ctx)

return 'schema'

@config
def logger(msg, params):
print('[python] logger msg', msg, 'params=', params)
Loading