diff --git a/packages/cubejs-backend-native/js/index.ts b/packages/cubejs-backend-native/js/index.ts index 002b5060a9129..c9ef5771a5c84 100644 --- a/packages/cubejs-backend-native/js/index.ts +++ b/packages/cubejs-backend-native/js/index.ts @@ -313,6 +313,7 @@ export const shutdownInterface = async (instance: SqlInterfaceInstance): Promise export interface PyConfiguration { repositoryFactory?: (ctx: unknown) => Promise, + logger?: (msg: string, params: Record) => void, checkAuth?: (req: unknown, authorization: string) => Promise queryRewrite?: (query: unknown, ctx: unknown) => Promise contextToApiScopes?: () => Promise @@ -348,6 +349,15 @@ export const pythonLoadConfig = async (content: string, options: { fileName: str }); } + if (config.logger) { + const nativeLogger = config.logger; + config.logger = (msg: string, params: Record) => { + nativeLogger(msg, params).catch((e: any) => { + console.error(e); + }); + }; + } + return config; }; diff --git a/packages/cubejs-backend-native/python/cube/src/conf/__init__.py b/packages/cubejs-backend-native/python/cube/src/conf/__init__.py index 7a3a0a2833531..257d828a2806b 100644 --- a/packages/cubejs-backend-native/python/cube/src/conf/__init__.py +++ b/packages/cubejs-backend-native/python/cube/src/conf/__init__.py @@ -1,5 +1,5 @@ import os -from typing import Union, Callable, Dict +from typing import Union, Callable, Dict, Any def file_repository(path): @@ -31,22 +31,39 @@ class RequestContext: class Configuration: + web_sockets: bool + http: Dict + graceful_shutdown: int + process_subscriptions_interval: int + web_sockets_base_path: str schema_path: str base_path: str - web_sockets_base_path: str - compiler_cache_size: int - telemetry: bool - pg_sql_port: int + dev_server: bool + api_secret: str cache_and_queue_driver: str allow_js_duplicate_props_in_schema: bool - process_subscriptions_interval: int - http: Dict jwt: Dict + scheduled_refresh_timer: Any + scheduled_refresh_timezones: list[str] + scheduled_refresh_concurrency: int + scheduled_refresh_batch_size: int + compiler_cache_size: int + update_compiler_cache_keep_alive: bool + max_compiler_cache_keep_alive: int + telemetry: bool + sql_cache: bool + live_preview: bool + # SQL API + pg_sql_port: int + sql_super_user: str + sql_user: str + sql_password: str # Functions logger: Callable context_to_app_id: Union[str, Callable[[RequestContext], str]] context_to_orchestrator_id: Union[str, Callable[[RequestContext], str]] - driver_factory: Callable + driver_factory: Callable[[RequestContext], Dict] + external_driver_factory: Callable[[RequestContext], Dict] db_type: Union[str, Callable[[RequestContext], str]] check_auth: Callable check_sql_auth: Callable @@ -61,22 +78,38 @@ class Configuration: orchestrator_options: Union[Dict, Callable[[RequestContext], Dict]] def __init__(self): + self.web_sockets = None + self.http = None + self.graceful_shutdown = None self.schema_path = None self.base_path = None + self.dev_server = None + self.api_secret = None self.web_sockets_base_path = None - self.compiler_cache_size = None - self.telemetry = None self.pg_sql_port = None self.cache_and_queue_driver = None self.allow_js_duplicate_props_in_schema = None self.process_subscriptions_interval = None - self.http = None self.jwt = None + self.scheduled_refresh_timer = None + self.scheduled_refresh_timezones = None + self.scheduled_refresh_concurrency = None + self.scheduled_refresh_batch_size = None + self.compiler_cache_size = None + self.update_compiler_cache_keep_alive = None + self.max_compiler_cache_keep_alive = None + self.telemetry = None + self.sql_cache = None + self.live_preview = None + self.sql_super_user = None + self.sql_user = None + self.sql_password = None # Functions self.logger = None self.context_to_app_id = None self.context_to_orchestrator_id = None self.driver_factory = None + self.external_driver_factory = None self.db_type = None self.check_auth = None self.check_sql_auth = None @@ -91,89 +124,15 @@ def __init__(self): self.pre_aggregations_schema = None self.orchestrator_options = None - def set_schema_path(self, schema_path: str): - self.schema_path = schema_path - - def set_base_path(self, base_path: str): - self.base_path = base_path - - def set_web_sockets_base_path(self, web_sockets_base_path: str): - self.web_sockets_base_path = web_sockets_base_path - - def set_compiler_cache_size(self, compiler_cache_size: int): - self.compiler_cache_size = compiler_cache_size - - def set_telemetry(self, telemetry: bool): - self.telemetry = telemetry - - def set_pg_sql_port(self, pg_sql_port: int): - self.pg_sql_port = pg_sql_port - - def set_cache_and_queue_driver(self, cache_and_queue_driver: str): - self.cache_and_queue_driver = cache_and_queue_driver - - def set_allow_js_duplicate_props_in_schema(self, allow_js_duplicate_props_in_schema: bool): - self.allow_js_duplicate_props_in_schema = allow_js_duplicate_props_in_schema - - def set_process_subscriptions_interval(self, process_subscriptions_interval: int): - self.process_subscriptions_interval = process_subscriptions_interval - - def set_logger(self, logger: Callable): - self.logger = logger - - def set_context_to_app_id(self, context_to_app_id: Union[str, Callable[[RequestContext], str]]): - self.context_to_app_id = context_to_app_id - - def set_context_to_orchestrator_id(self, context_to_orchestrator_id: Union[str, Callable[[RequestContext], str]]): - self.context_to_orchestrator_id = context_to_orchestrator_id - - def set_driver_factory(self, driver_factory: Callable): - self.driver_factory = driver_factory - - def set_db_type(self, db_type: Union[str, Callable[[RequestContext], str]]): - self.db_type = db_type - - def set_check_auth(self, check_auth: Callable): - self.check_auth = check_auth - - def set_check_sql_auth(self, check_sql_auth: Callable): - self.check_sql_auth = check_sql_auth - - def set_can_switch_sql_user(self, can_switch_sql_user: Callable): - self.can_switch_sql_user = can_switch_sql_user - - def set_query_rewrite(self, query_rewrite: Callable): - self.query_rewrite = query_rewrite - - def set_extend_context(self, extend_context: Callable[[RequestContext], Dict]): - self.extend_context = extend_context - - def set_scheduled_refresh_contexts(self, scheduled_refresh_contexts: Callable): - self.scheduled_refresh_contexts = scheduled_refresh_contexts - - def set_repository_factory(self, repository_factory: Callable): - self.repository_factory = repository_factory - - def set_schema_version(self, schema_version: Union[str, Callable[[RequestContext], str]]): - self.schema_version = schema_version - - def set_semantic_layer_sync(self, semantic_layer_sync: Union[Dict, Callable[[], Dict]]): - self.semantic_layer_sync = semantic_layer_sync - - def set_pre_aggregations_schema(self, pre_aggregations_schema: Union[str, Callable[[RequestContext], str]]): - self.pre_aggregations_schema = pre_aggregations_schema - - def set_orchestrator_options(self, orchestrator_options: Union[Dict, Callable[[RequestContext], Dict]]): - self.orchestrator_options = orchestrator_options - - -settings = Configuration() + def __call__(self, func): + if not callable(func): + raise ConfigurationException("@config decorator must be used with functions, actual: '%s'" % type(func).__name__) -def config(func): - if not callable(func): - raise ConfigurationException("@config decorator must be used with functions, actual: '%s'" % type(func).__name__) + if hasattr(self, func.__name__): + setattr(self, func.__name__, func) + else: + raise ConfigurationException("Unknown configuration property: '%s'" % func.__name__) - if hasattr(settings, func.__name__): - setattr(settings, func.__name__, func) - else: - raise ConfigurationException("Unknown settings property: '%s'" % func.__name__) +config = Configuration() +# backward compatibility +settings = config \ No newline at end of file diff --git a/packages/cubejs-backend-native/src/python/cube_config.rs b/packages/cubejs-backend-native/src/python/cube_config.rs index 3a7aa423e18d7..56f5b9da76cc3 100644 --- a/packages/cubejs-backend-native/src/python/cube_config.rs +++ b/packages/cubejs-backend-native/src/python/cube_config.rs @@ -15,44 +15,56 @@ impl CubeConfigPy { } } - pub fn get_static_attrs(&self) -> Vec<&'static str> { + pub fn get_attrs(&self) -> Vec<&'static str> { vec![ + "web_sockets", + "http", + "graceful_shutdown", + "process_subscriptions_interval", + "web_sockets_base_path", "schema_path", "base_path", - "web_sockets_base_path", - "compiler_cache_size", - "telemetry", - "pg_sql_port", + "dev_server", + "api_secret", "cache_and_queue_driver", "allow_js_duplicate_props_in_schema", - "process_subscriptions_interval", - "http", "jwt", + "scheduled_refresh_timer", + "scheduled_refresh_timezones", + "scheduled_refresh_concurrency", + "scheduled_refresh_batch_size", + "compiler_cache_size", + "update_compiler_cache_keep_alive", + "max_compiler_cache_keep_alive", + "telemetry", + "sql_cache", + "live_preview", + "pg_sql_port", + "sql_super_user", + "sql_user", + "sql_password", + // functions + "logger", + "context_to_app_id", + "context_to_orchestrator_id", + "driver_factory", + "external_driver_factory", + "db_type", + "check_auth", + "check_sql_auth", + "can_switch_sql_user", + "query_rewrite", + "extend_context", + "scheduled_refresh_contexts", + "context_to_api_scopes", + "repository_factory", + "semantic_layer_sync", + "schema_version", + "pre_aggregations_schema", + "orchestrator_options", ] } - pub fn apply_dynamic_functions(&mut self, config_module: &PyAny) -> PyResult<()> { - self.attr(config_module, "logger")?; - self.attr(config_module, "context_to_app_id")?; - self.attr(config_module, "context_to_orchestrator_id")?; - self.attr(config_module, "driver_factory")?; - self.attr(config_module, "db_type")?; - self.attr(config_module, "check_auth")?; - self.attr(config_module, "check_sql_auth")?; - self.attr(config_module, "can_switch_sql_user")?; - self.attr(config_module, "query_rewrite")?; - self.attr(config_module, "extend_context")?; - self.attr(config_module, "scheduled_refresh_contexts")?; - self.attr(config_module, "context_to_api_scopes")?; - self.attr(config_module, "repository_factory")?; - self.attr(config_module, "semantic_layer_sync")?; - self.attr(config_module, "schema_version")?; - self.attr(config_module, "pre_aggregations_schema")?; - self.attr(config_module, "orchestrator_options")?; - - Ok(()) - } - pub fn attr(&mut self, config_module: &PyAny, key: &str) -> PyResult<()> { let v = config_module.getattr(&*key)?; if !v.is_none() { diff --git a/packages/cubejs-backend-native/src/python/entry.rs b/packages/cubejs-backend-native/src/python/entry.rs index f43b9a818f33e..9a84772099e1a 100644 --- a/packages/cubejs-backend-native/src/python/entry.rs +++ b/packages/cubejs-backend-native/src/python/entry.rs @@ -30,18 +30,19 @@ fn python_load_config(mut cx: FunctionContext) -> JsResult { let settings_py = if config_module.hasattr("__execution_context_locals")? { let execution_context_locals = config_module.getattr("__execution_context_locals")?; execution_context_locals.get_item("settings")? + } else if config_module.hasattr("config")? { + config_module.getattr("config")? } else { + // backward compatibility config_module.getattr("settings")? }; let mut cube_conf = CubeConfigPy::new(); - for attr_name in cube_conf.get_static_attrs() { + for attr_name in cube_conf.get_attrs() { cube_conf.attr(settings_py, attr_name)?; } - cube_conf.apply_dynamic_functions(settings_py)?; - Ok(cube_conf) }); diff --git a/packages/cubejs-backend-native/test/config.py b/packages/cubejs-backend-native/test/config.py index 8a9637670ec2a..c7cc7e1e27a42 100644 --- a/packages/cubejs-backend-native/test/config.py +++ b/packages/cubejs-backend-native/test/config.py @@ -1,12 +1,11 @@ from cube.conf import ( config, - settings, file_repository ) -settings.schema_path = "models" -settings.pg_sql_port = 5555 -settings.telemetry = False +config.schema_path = "models" +config.pg_sql_port = 5555 +config.telemetry = False @config def query_rewrite(query, ctx): @@ -27,3 +26,19 @@ async def repository_factory(ctx): async def context_to_api_scopes(): print('[python] context_to_api_scopes') return ['meta', 'data', 'jobs'] + +@config +def schema_version(ctx): + print('[python] schema_version', ctx) + + return '1' + +@config +def pre_aggregations_schema(ctx): + print('[python] pre_aggregations_schema', ctx) + + return 'schema' + +@config +def logger(msg, params): + print('[python] logger msg', msg, 'params=', params) diff --git a/packages/cubejs-backend-native/test/old-config.py b/packages/cubejs-backend-native/test/old-config.py new file mode 100644 index 0000000000000..faadd64ef5eef --- /dev/null +++ b/packages/cubejs-backend-native/test/old-config.py @@ -0,0 +1,51 @@ +from cube.conf import ( + settings, + file_repository +) + +settings.schema_path = "models" +settings.pg_sql_port = 5555 +settings.telemetry = False + +def query_rewrite(query, ctx): + print('[python] query_rewrite query=', query, ' ctx=', ctx) + return query + +settings.query_rewrite = query_rewrite + +async def check_auth(req, authorization): + print('[python] check_auth req=', req, ' authorization=', authorization) + +settings.check_auth = check_auth + +async def repository_factory(ctx): + print('[python] repository_factory ctx=', ctx) + + return file_repository(ctx['securityContext']['schemaPath']) + +settings.repository_factory = repository_factory + +async def context_to_api_scopes(): + print('[python] context_to_api_scopes') + return ['meta', 'data', 'jobs'] + +settings.context_to_api_scopes = context_to_api_scopes + +def schema_version(ctx): + print('[python] schema_version', ctx) + + return '1' + +settings.schema_version = schema_version + +def pre_aggregations_schema(ctx): + print('[python] pre_aggregations_schema', ctx) + + return 'schema' + +settings.pre_aggregations_schema = pre_aggregations_schema + +def logger(msg, params): + print('[python] logger msg', msg, 'params=', params) + +settings.logger = logger diff --git a/packages/cubejs-backend-native/test/python.test.ts b/packages/cubejs-backend-native/test/python.test.ts index d489177ef9c37..4c2c9be919b1f 100644 --- a/packages/cubejs-backend-native/test/python.test.ts +++ b/packages/cubejs-backend-native/test/python.test.ts @@ -37,12 +37,15 @@ suite('Python Config', () => { test('async checkAuth', async () => { expect(config).toEqual({ schemaPath: 'models', - pgSqlPort: 5555, telemetry: false, contextToApiScopes: expect.any(Function), + logger: expect.any(Function), + pgSqlPort: 5555, + preAggregationsSchema: expect.any(Function), checkAuth: expect.any(Function), queryRewrite: expect.any(Function), repositoryFactory: expect.any(Function), + schemaVersion: expect.any(Function), }); if (!config.checkAuth) { @@ -126,6 +129,33 @@ suite('Python Config', () => { }); }); +darwinSuite('Old Python Config', () => { + test('test', async () => { + const config = await loadConfigurationFile('old-config.py'); + expect(config).toEqual({ + schemaPath: 'models', + telemetry: false, + contextToApiScopes: expect.any(Function), + logger: expect.any(Function), + pgSqlPort: 5555, + preAggregationsSchema: expect.any(Function), + checkAuth: expect.any(Function), + queryRewrite: expect.any(Function), + repositoryFactory: expect.any(Function), + schemaVersion: expect.any(Function), + }); + + if (!config.checkAuth) { + throw new Error('checkAuth was not defined in config.py'); + } + + await config.checkAuth( + { requestId: 'test' }, + 'MY_SECRET_TOKEN' + ); + }); +}); + darwinSuite('Scoped Python Config', () => { test('test', async () => { const config = await loadConfigurationFile('scoped-config.py');