Conversation
…ore `dataflow init` issue
|
Fix #17 |
dataflow init issuedataflow init issue
There was a problem hiding this comment.
Pull request overview
This PR refactors the registry initialization system to solve issue #17 regarding import-before-initialization problems. The main change introduces an AppContainer class that centralizes and controls the initialization order of all registry singletons (dataset, operator, pipeline, prompt, serving, and task registries), preventing issues caused by uncontrolled module-level initialization.
Key changes:
- Created
AppContainerclass to manage lazy initialization of all registries with explicit ordering - Removed all module-level singleton instances (
_DATASET_REGISTRY,_PIPELINE_REGISTRY, etc.) and replaced them withcontainer.<registry_name>access pattern - Moved
VisualizeDatasetServicefrom a separate file intodataset_registry.pyand added it to the container - Moved dataset scanning logic from
dataflow_setup.pyintoDatasetRegistry.scan_all_datasets()called during initialization
Reviewed changes
Copilot reviewed 18 out of 18 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
backend/app/core/container.py |
New file introducing AppContainer class for centralized registry management with explicit initialization order |
backend/app/main.py |
Added container.init() call after dataflow setup to initialize all registries |
backend/app/core/config.py |
Updated registry file paths: renamed DATA_REGISTRY path and added comment to OPS_JSON_PATH |
backend/app/core/dataflow_setup.py |
Removed dataset registration logic (moved to DatasetRegistry.scan_all_datasets()) |
backend/app/services/dataset_registry.py |
Merged VisualizeDatasetService into this file, added scan_all_datasets() method called in constructor, added logging statements |
backend/app/services/pipeline_registry.py |
Removed global singleton, updated to use container.operator_registry and container.dataset_registry, added logging and debug print statement |
backend/app/services/operator_registry.py |
Removed global _op_registry singleton instance |
backend/app/services/prompt_registry.py |
Removed global _PROMPT_REGISTRY singleton instance |
backend/app/services/serving_registry.py |
Removed global _SERVING_REGISTRY singleton instance |
backend/app/services/task_registry.py |
Removed global _TASK_REGISTRY singleton instance |
backend/app/services/dataflow_engine.py |
Updated to use container.serving_registry and container.dataset_registry instead of direct imports |
backend/app/services/visualize_dataset.py |
Deleted file (contents moved to dataset_registry.py) |
backend/app/api/v1/endpoints/tasks.py |
Updated all references from _TASK_REGISTRY to container.task_registry |
backend/app/api/v1/endpoints/serving.py |
Updated all references from _SERVING_REGISTRY and _PROMPT_REGISTRY to use container |
backend/app/api/v1/endpoints/prompts.py |
Updated all references from _PROMPT_REGISTRY to container.prompt_registry |
backend/app/api/v1/endpoints/pipelines.py |
Updated all references from _PIPELINE_REGISTRY to container.pipeline_registry |
backend/app/api/v1/endpoints/operators.py |
Updated all references from _op_registry to container.operator_registry |
backend/app/api/v1/endpoints/datasets.py |
Updated all references from _DATASET_REGISTRY and _visualize_service to use container |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| # logging.info(f"Registered dataset from {relative_path}.") | ||
|
|
There was a problem hiding this comment.
This commented-out logging statement should be removed. It appears to be leftover debug code that is no longer needed.
| # logging.info(f"Registered dataset from {relative_path}.") |
| self.scan_all_datasets() | ||
|
|
There was a problem hiding this comment.
The DatasetRegistry constructor now calls scan_all_datasets() during initialization, which performs I/O operations (filesystem scanning and registry updates). This side effect during object construction can cause issues:
- It makes the container initialization slower and potentially fail if the directory doesn't exist
- It violates the principle of separating object construction from initialization logic
- It makes testing more difficult
Consider moving this call to a separate initialization method or lazy-loading the datasets on first access.
| self.scan_all_datasets() |
| return 0 | ||
|
|
||
| def list(self) -> List[Dict]: | ||
| logger.info("开始读取所有数据集信息") |
There was a problem hiding this comment.
This logging statement was added but provides minimal value. The message "开始读取所有数据集信息" (Starting to read all dataset information) is logged every time the list() method is called, which could be quite frequent. Consider removing this log or changing it to debug level to reduce noise in production logs.
| logger.info("开始读取所有数据集信息") | |
| logger.debug("开始读取所有数据集信息") |
|
|
||
| def create_app() -> FastAPI: | ||
| setup_dataflow_core() | ||
| container.init() |
There was a problem hiding this comment.
The container is initialized at module level (line 16 calls create_app() which initializes the container), but the container could be accessed before init() is called if any module imports from the container. Consider adding guards in the AppContainer to check if init() has been called before accessing the registries, or documenting that container must not be accessed before init() is called.
| class VisualizeDatasetService: | ||
| def __init__(self): | ||
| self.pandas_read_func_map = { | ||
| "csv": pd.read_csv, | ||
| "excel": pd.read_excel, | ||
| "json": pd.read_json, | ||
| "parquet": pd.read_parquet, | ||
| "pickle": pd.read_pickle, | ||
| "jsonl": lambda path: pd.read_json(path, lines=True), | ||
| } | ||
| self.media_type_map = { | ||
| "txt": "text/plain", | ||
| "md": "text/markdown", | ||
| "jpg": "image/jpeg", | ||
| "jpeg": "image/jpeg", | ||
| "png": "image/png", | ||
| "gif": "image/gif", | ||
| "pdf": "application/pdf", | ||
| "doc": "application/msword", | ||
| "docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document", | ||
| "ppt": "application/vnd.ms-powerpoint", | ||
| "pptx": "application/vnd.openxmlformats-officedocument.presentationml.presentation", | ||
| } | ||
|
|
||
| def get_pandas_read_function(self, ds:dict, start:int=0, end:int=5): | ||
| file_type = ds.get("type","").lower() | ||
| file_path = ds.get("root","") | ||
| if not os.path.exists(file_path): | ||
| raise FileNotFoundError(f"File {file_path} does not exist. Please check the path.") | ||
| if file_type not in self.pandas_read_func_map: | ||
| raise ValueError(f"File type {file_type} is not supported for pandas visualization.") | ||
|
|
||
| read_func = self.pandas_read_func_map.get(file_type, None) | ||
| if not read_func: | ||
| raise ValueError(f"No read function found for type: {file_type}") | ||
|
|
||
| df: pd.DataFrame = read_func(file_path) | ||
| return df.iloc[start:end].to_json(orient="records") | ||
|
|
||
| def list_supported_file_types(self): | ||
| return list(self.pandas_read_func_map.keys()) | ||
|
|
||
| def get_other_visualization_data(self, ds:dict): | ||
| file_type = ds.get("type","").lower() | ||
| file_path = ds.get("root","") | ||
| if not os.path.exists(file_path): | ||
| raise FileNotFoundError(f"File {file_path} does not exist. Please check the path.") | ||
| media_type = self.media_type_map.get(file_type, "application/octet-stream") | ||
| return file_path, media_type |
There was a problem hiding this comment.
The VisualizeDatasetService class was moved from a separate file (visualize_dataset.py) to dataset_registry.py. While this consolidation may make sense from a dependency perspective, it reduces modularity and increases coupling. The VisualizeDatasetService is conceptually a separate concern from DatasetRegistry (one visualizes datasets, the other manages dataset metadata). Consider whether this service should remain in its own module or be nested as an inner class if the tight coupling is intentional.
| data = self._read() | ||
| api_pipelines_dir = os.path.join(settings.DATAFLOW_CORE_DIR, "api_pipelines") | ||
|
|
||
| print(api_pipelines_dir) |
There was a problem hiding this comment.
This print statement should be removed or replaced with a proper logger call. Debug print statements should not be committed to production code.
| print(api_pipelines_dir) | |
| logger.info(f"API pipelines directory: {api_pipelines_dir}") |
| # 基本 | ||
| ENV: str = "dev" | ||
| DATA_REGISTRY: str = "data/registry.yaml" | ||
| DATA_REGISTRY: str = "data/data_registry.yaml" # |
There was a problem hiding this comment.
The comment appears incomplete. It starts with "#" but doesn't provide any explanation. Either complete the comment to explain why the path was changed from "data/registry.yaml" to "data/data_registry.yaml", or remove the "#" if no comment is needed.
| DATA_REGISTRY: str = "data/data_registry.yaml" # | |
| DATA_REGISTRY: str = "data/data_registry.yaml" |
| """ | ||
| 确保注册表文件存在,并加载api_pipelines目录中的所有py文件 | ||
| """ | ||
| logger.info("初始化Pipeline Registry的注册表...") |
There was a problem hiding this comment.
The logger statement "初始化Pipeline Registry的注册表..." is added during the _ensure() method, which is called from the constructor. This will be logged every time a PipelineRegistry is instantiated. Since the container creates this as a singleton, this is acceptable, but consider making it clear this is initialization logging for the singleton instance.
| logger.info("初始化Pipeline Registry的注册表...") | |
| logger.info("初始化Pipeline Registry的注册表 (单例实例初始化)...") |
| for root, dirs, files in os.walk(pipeline_path): | ||
| for file in files: | ||
| file_path = os.path.join(root, file) | ||
|
|
||
| ds_dict = { | ||
| "name": f"{pipeline_name}-{file}", | ||
| "root": file_path, | ||
| "pipeline": pipeline_name, | ||
| "meta": {}, | ||
| } |
There was a problem hiding this comment.
The scan_all_datasets method iterates through all files in all subdirectories using os.walk. However, the dataset name is constructed as "{pipeline_name}-{file}" which only uses the filename, not considering that files might be in nested subdirectories. If two different subdirectories contain files with the same name, they will have conflicting dataset names. The file_path is correctly captured with the full path, but the dataset name could collide. Consider using a more unique naming scheme that includes the relative path or a hash.
| def __init__(self): | ||
| self.dataset_registry = None | ||
| self.operator_registry = None | ||
| self.prompt_registry = None | ||
| self.serving_registry = None | ||
| self.task_registry = None | ||
| self.pipeline_registry = None | ||
| self.dataset_visualize_service = None |
There was a problem hiding this comment.
The AppContainer class has all attributes initialized to None in init, but there's no protection against accessing these attributes before init() is called. If any code tries to access container.dataset_registry before container.init() is invoked, it will receive None which could lead to AttributeError when trying to call methods on it. Consider adding a property decorator with a check, or raising a clear error message if attributes are accessed before initialization.
|
Nice refactor! Thank you very much! |
No description provided.