diff --git a/dataci/models/script.py b/dataci/models/script.py index 9cf1b66..86243dc 100644 --- a/dataci/models/script.py +++ b/dataci/models/script.py @@ -202,6 +202,8 @@ def copy( dst_path = Path(dst) dst_path.mkdir(parents=True, exist_ok=dirs_exist_ok) for file in self.filelist: + # Create the parent directory if not exist + (dst_path / file).parent.mkdir(parents=True, exist_ok=True) copy_function(self.dir / file, dst_path / file) return dst @@ -309,7 +311,7 @@ def replace_source_segment(source, nodes, replace_segments): replace_segment = indent(dedent(replace_segment), indent_prefix).strip() # Replace code segment new_script += source[prev_end:start] + replace_segment - prev_end = end + 1 + prev_end = end break new_script += source[prev_end:] return new_script @@ -364,7 +366,7 @@ def get_syntax_lines(syntax: Syntax) -> int: Tuple[int, List[Renderable]]: The number of lines and syntax renderables. """ # convert to renderables - renderables = list(text_syntax.__rich_console__(console, console.options)) + renderables = list(syntax.__rich_console__(console, console.options)) # counter # \n in renderables segements = itertools.chain(*map(lambda x: x.segments, renderables)) num_lines = list(map(lambda x: x.text, segements)).count('\n') @@ -412,7 +414,7 @@ def get_syntax_lines(syntax: Syntax) -> int: table.add_column('old-new-sep', justify='right', width=2, style='blue') table.add_column('new_lineno', justify='right', width=lineno_width, style='white') table.add_column('new-text-sep', justify='right', width=2, style='blue') - table.add_column('text', justify='left', style='white') + table.add_column('text', justify='left', min_width=80, style='white') old_lineno, new_lineno = 0, 0 @@ -436,6 +438,7 @@ def get_syntax_lines(syntax: Syntax) -> int: 'python', line_numbers=False, word_wrap=True, + theme='github-dark', background_color='default', line_range=(1, 1), ), @@ -454,6 +457,7 @@ def get_syntax_lines(syntax: Syntax) -> int: 'python', line_numbers=False, word_wrap=True, + theme='github-dark', background_color='default', line_range=(old_lineno, old_lineno), code_width=80, diff --git a/dataci/models/workflow.py b/dataci/models/workflow.py index 970477f..395dcba 100644 --- a/dataci/models/workflow.py +++ b/dataci/models/workflow.py @@ -9,13 +9,16 @@ import itertools import json import logging +import multiprocessing as mp import shutil +import sys from abc import ABC from collections import defaultdict from datetime import datetime from pathlib import Path from typing import TYPE_CHECKING +import cloudpickle import networkx as nx from dataci.db.workflow import ( @@ -132,20 +135,39 @@ def from_dict(cls, config: 'dict'): @classmethod def from_path(cls, script_dir: 'Union[str, os.PathLike]', entry_path: 'Union[str, os.PathLike]'): # TODO: make the build process more secure with sandbox / allowed safe methods - local_dict = dict() + def _import_module(entry_module, shared_import_pickle): + import cloudpickle + import importlib + import os + from dataci.models import Workflow + + mod = importlib.import_module(entry_module) + # get all variables from the module + for k, v in mod.__dict__.items(): + if not k.startswith('__') and isinstance(v, Workflow): + shared_import_pickle['__return__'] = cloudpickle.dumps(v) + break + else: + raise ValueError(f'Workflow not found in directory: {os.getcwd()}') + with cwd(script_dir): + import sys entry_file = Path(entry_path) + sys_path = sys.path.copy() + # Append the local dir to the sys path + sys.path.insert(0, '') entry_module = '.'.join(entry_file.parts[:-1] + (entry_file.stem,)) - exec( - f'import os, sys; sys.path.insert(0, os.getcwd()); from {entry_module} import *', - local_dict, local_dict - ) - for v in local_dict.copy().values(): - if isinstance(v, Workflow): - self = v - break - else: - raise ValueError(f'Workflow not found in directory: {script_dir}') + with mp.Manager() as manager: + import_pickle = manager.dict() + p = mp.Process(target=_import_module, args=(entry_module, import_pickle,)) + p.start() + p.join() + try: + self = cloudpickle.loads(import_pickle['__return__']) + except KeyError: + raise ValueError(f'Workflow not found in directory: {script_dir}') + # restore sys path + sys.path = sys_path return self @@ -187,6 +209,7 @@ def reload(self, config=None): self.create_date = datetime.fromtimestamp(config['timestamp']) if config['timestamp'] else None self.trigger = [Event.from_str(evt) for evt in config['trigger']] if 'script' in config: + # fixme: reload the object if the script hash is changed self._script = Script.from_dict(config['script']) if 'dag' in config: self._stage_script_paths.clear() @@ -287,12 +310,12 @@ def patch(self, verbose=True, **kwargs): for k, stage in kwargs.items(): # Convert k to full stage name full_k = f'{self.workspace.name}.{k}' if self.workspace else k + # Check if the stage is in the workflow if full_k not in self.stages: raise ValueError(f'Cannot find stage name={k} in workflow {self.name}') - if stage.name != k: - raise ValueError(f'Cannot patch stage {stage.name} to {k} in workflow {self.name}') + # TODO: Check if the stage has the same signature + # Warning if the new stage has different signature with the old stage new_workflow = patch_func(self, source_name=full_k, target=stage, logger=self.logger, verbose=verbose) - new_workflow = self.reload(new_workflow.dict()) return new_workflow diff --git a/dataci/models/workflow_patcher.py b/dataci/models/workflow_patcher.py index a36d870..d95d302 100644 --- a/dataci/models/workflow_patcher.py +++ b/dataci/models/workflow_patcher.py @@ -66,14 +66,16 @@ no need to take care of func / import name """ import ast +import atexit +import inspect import logging import os import re +import tempfile from io import StringIO from pathlib import Path -from pydoc import pipepager +from pydoc import getpager, pipepager from shutil import rmtree -from tempfile import TemporaryDirectory from typing import TYPE_CHECKING, List, Set import git @@ -88,54 +90,72 @@ pretty_print_diff, pretty_print_dircmp ) -from dataci.utils import cwd +from dataci.utils import cwd, removeprefix, removesuffix if TYPE_CHECKING: from typing import Any, Tuple, Union from dataci.models import Workflow -def replace_package(basedir: 'Path', source: 'Stage', target: 'Stage'): +def replace_package(basedir: 'Path', source: 'Stage', target: 'Stage', logger: 'logging.Logger'): new_mountdir = basedir / target.name - print(f"replace package {source} -> {target}") + logger.debug(f"replace package {source} -> {target}") rm_files = list(map(lambda x: basedir / x, source.script.filelist)) + source_entry_path_abs = basedir / source.script.entry_path for rm_file in rm_files: - print(f"Remove file '{rm_file}'") - rm_file.unlink() + if rm_file != source_entry_path_abs: + logger.debug(f"Remove file '{rm_file}'") + rm_file.unlink() + else: + logger.debug(f"Truncate file '{rm_file}'") + rm_file.write_text('') for add_file in map(lambda x: new_mountdir / x, target.script.filelist): - print(f"Add file '{add_file}'") + logger.debug(f"Add file '{add_file}'") target.script.copy(new_mountdir, dirs_exist_ok=True) - # Check if the package has '__init__.py' file - if not (new_mountdir / '__init__.py').exists(): - (new_mountdir / '__init__.py').touch() + + # Make the new stage package importable + init_file = new_mountdir / '__init__.py' + if not init_file.exists(): + logger.debug(f"Add file '{init_file}'") + init_file.touch() return new_mountdir -def replace_entry_func(basedir: Path, source: 'Stage', target: 'Stage', fix_import_name: bool): +def replace_entry_func( + basedir: Path, + source: 'Stage', + target: 'Stage', + fix_import_name: bool, + logger: 'logging.Logger' +): new_mountdir = basedir / target.name target_stage_entrypoint = path_to_module_name(new_mountdir.relative_to(basedir)) + '.' + target.script.entrypoint - target_func_name = target_stage_entrypoint.split('.')[-1] - target_stage_mod = '.'.join(target_stage_entrypoint.split('.')[:-1]) or '.' source_func_name = source.script.entrypoint.split('.')[-1] - print(f"replace entry func {source} -> {target}") + logger.debug(f"replace entry func {source} -> {target}") modify_file = basedir / source.script.entry_path modify_file_script = modify_file.read_text() new_file_script = replace_source_segment( modify_file_script, source.script.entry_node, - f'from {target_stage_mod} import {target_func_name} as {source_func_name}' if fix_import_name else '', + gen_import_script(target_stage_entrypoint, alias=source_func_name) if fix_import_name else '', ) - print(f"Modify file '{modify_file}'") + logger.debug(f"Modify file '{modify_file}'") modify_file.write_text(new_file_script) for add_file in map(lambda x: new_mountdir / x, target.script.filelist): - print(f"Add file '{add_file}'") + logger.debug(f"Add file '{add_file}'") target.script.copy(new_mountdir, dirs_exist_ok=True) + # Make the new stage package importable + init_file = new_mountdir / '__init__.py' + if not init_file.exists(): + logger.debug(f"Add file '{init_file}'") + init_file.touch() + return new_mountdir @@ -144,11 +164,13 @@ def fixup_entry_func_import_name( source_stage_entrypoint: str, target_stage_entrypoint: str, source_stage_package: str, - replace_package: bool = False, + replace_package: bool, + logger: 'logging.Logger', ): - print('fixup entry func import name') + logger.debug('fixup entry func import name') + source_stage_package = (source_stage_package + '.').lstrip('.') pat = re.compile( - re.escape((source_stage_package + '.').lstrip('.')) + + re.escape(source_stage_package) + r'(.*)\.([^.]+)' ) if match := pat.match(source_stage_entrypoint): @@ -184,93 +206,59 @@ def fixup_entry_func_import_name( # -import stage_root.stg_pkg.func as var_name # +import new_stage_root.new_stage_pkg.new_func as var_name replace_nodes.append(node) - replace_segs.append(f"import {target_stage_entrypoint} as {var_name}") + replace_segs.append(gen_import_script(target_stage_entrypoint, alias=var_name)) else: # stage is imported / ref as a package name - if not replace_package: - # Case 1: replace the old function module name with the new function module name - if global_import_name.endswith(func_name): - # i. import statement include the function name, - # replace the import statement, since the function is not valid - # ```diff - # -import stage_root.stg_pkg.func - # +import stage_root.stg_pkg - # +import stage_root.new_stage_pkg.new_func - # +stage_root.stg_pkg.func = new_stage_root.new_stage_pkg.new_func - # ``` - replace_nodes.append(node) - replace_segs.append( - f"import {global_import_name.rstrip(func_name)}\n" - f"import {target_stage_entrypoint}\n" - f"{var_name} = {target_stage_entrypoint}" - ) - else: - # ii. Otherwise, - # ```diff - # import stage_root.stg_pkg as alias - # +import stage_root.new_stage_pkg.new_func - # +alias.func = dag_pkg.new_stage_pkg.func - # ``` - replace_nodes.append(node) - replace_segs.append( - get_source_segment(script, node) + '\n' + - f"import {target_stage_entrypoint}\n" - f"{var_name} = {target_stage_entrypoint}" - ) + # Case 1: replace the old function module name with the new function module name + if global_import_name == source_stage_entrypoint: + # i. import statement include the function name, + # replace the import statement, since the function is not valid + # ```diff + # -import stage_root.stg_pkg.func + # +import stage_root.stg_pkg + # +import stage_root.new_stage_pkg.new_func + # +stage_root.stg_pkg.func = new_stage_root.new_stage_pkg.new_func + # ``` + replace_nodes.append(node) + replace_segs.append( + f"import {removesuffix(global_import_name, '.' + func_name)}\n" + f"{gen_import_script(target_stage_entrypoint, absolute_import=True)}\n" + f"{var_name} = {target_stage_entrypoint}" + ) else: - # Case 2: if stage package is replaced, need to mock the stage package - stage_pkg_mods = global_import_name.lstrip(source_stage_package + '.') - add_lines = list() - if f'{stage_pkg_name}.{func_name}'.lstrip('.').startswith(stage_pkg_mods): - # i. import statement include the stage package name - # Alias import is used - # ```diff - # -import stage_root.stg_mod1 as alias - # +alias = types.ModuleType('stage_root.stg_mod1') - # +alias.stg_mod2 = types.ModuleType('stage_root.stg_mod1.stg_mod2') - # ``` - # Or direct import is used - # ```diff - # -import stage_root.stg_mod1.stg_mod2 - # +import stage_root - # +stage_root.stg_mod1 = types.ModuleType('stage_root.stg_mod1') - # +stage_root.stg_mod1.stg_mod2 = types.ModuleType('stage_root.stg_mod1.stg_mod2') - # ``` - import_name = '' - add_stage_root_import, add_types_import = False, True - for mod in var_name.split('.')[:-1]: - import_name = import_name + '.' + mod if import_name else mod # prevent leading '.' - if source_stage_package.startswith(import_name): - # found: import stage_root_mod1, import stage_root_mod1.stage_root_mod2, etc - add_stage_root_import = True - continue - elif add_stage_root_import: - # Not found stage_root import any further, add stage_root import: - # +import stage_root_mod1.stage_root_mod2 - add_lines.append(f'import {import_name}') - add_stage_root_import = False - if add_types_import: - add_lines.append(f'import types') - add_types_import = False - add_lines.append(f'{import_name} = types.ModuleType({mod!r})') - else: - # ii. Otherwise, do nothing for import fixing - add_lines.append(get_source_segment(script, node)) - - # Add the new function import and fix import + # ii. Otherwise, # ```diff - # +import new_stage_root.new_stage_pkg.new_func - # +alias.func = new_stage_root.new_stage_pkg.new_func + # import stage_root.stg_pkg as alias + # +import stage_root.new_stage_pkg.new_func + # +alias.func = dag_pkg.new_stage_pkg.func # ``` - add_lines.append(f'import {target_stage_entrypoint}') - add_lines.append(f'{var_name} = {target_stage_entrypoint}') replace_nodes.append(node) - replace_segs.append(f'{os.linesep}'.join(add_lines)) + replace_segs.append( + get_source_segment(script, node) + '\n' + + f"{gen_import_script(target_stage_entrypoint, absolute_import=True)}\n" + f"{var_name} = {target_stage_entrypoint}" + ) + # if replace_package: + # # if stage package is replaced, need to mock the stage package + # stage_pkg_mods = removeprefix(global_import_name, source_stage_package) + # if f'{stage_pkg_name}.{func_name}'.lstrip('.').startswith(stage_pkg_mods): + # # Fix import path by create a mock package + # stage_base_dir = Path(source_stage_package.replace('.', '/')) + # stage_pkg_file = Path(stage_pkg_name.replace('.', '/')).with_suffix('.py') + # for par in reversed(stage_pkg_file.parents): + # # 1. Create empty python module + # (stage_base_dir / par).mkdir(exist_ok=True) + # if not (init_file := (stage_base_dir / par / '__init__.py')).exists(): + # init_file.touch() + # + # # 2. Create empty python file + # if not (stage_file := stage_base_dir / stage_pkg_file).exists(): + # stage_file.touch(exist_ok=True) # Replace all import statements at one time if len(replace_nodes) > 0: new_script = replace_source_segment(script, replace_nodes, replace_segs) - print(f"Modify file '{path}'") + logger.debug(f"Modify file '{path}'") path.write_text(new_script) @@ -281,6 +269,14 @@ def path_to_module_name(path: Path): return '.'.join(path.with_suffix('').parts).strip('/') +def gen_import_script(entrypoint: str, absolute_import: bool = False, alias: str = None): + func_name = entrypoint.split('.')[-1] + mod = '.'.join(entrypoint.split('.')[:-1]) or '.' + if absolute_import: + return f'import {mod}' + return f'from {mod} import {func_name}' + (f' as {alias}' if (alias and alias != func_name) else '') + + def get_all_predecessors( g: 'Union[nx.MultiDiGraph, nx.DiGraph]', source: 'Union[Set, List]', @@ -415,8 +411,9 @@ def patch( f'{entrypoint_outer_caller}\n' ) - tmp_dir = TemporaryDirectory(dir=workflow.workspace.tmp_dir) - tmp_dir = Path(tmp_dir.name) + tmp_dir = tempfile.mkdtemp(dir=workflow.workspace.tmp_dir) + atexit.register(tempfile.TemporaryDirectory._rmtree, tmp_dir) + tmp_dir = Path(tmp_dir) # Copy the dag package to a temp dir workflow.script.copy(tmp_dir, dirs_exist_ok=True) @@ -439,22 +436,36 @@ def patch( ) if flg_replace_pkg: - new_stage_dir = replace_package(stage_base_dir, workflow.stages[source_name], target) + new_stage_dir = replace_package( + basedir=stage_base_dir, + source=workflow.stages[source_name], + target=target, + logger=logger + ) else: - new_stage_dir = replace_entry_func(stage_base_dir, workflow.stages[source_name], target, flg_fix_import_name) + new_stage_dir = replace_entry_func( + basedir=stage_base_dir, + source=workflow.stages[source_name], + target=target, + fix_import_name=flg_fix_import_name, + logger=logger + ) new_entrypoint = path_to_module_name(new_stage_dir.relative_to(tmp_dir)) + '.' + target.script.entrypoint if flg_fix_import_name: paths = list() for caller in entrypoint_callers & package_nodes - {top_node_id}: label = call_graph.nodes[caller]['label'] - paths.append((tmp_dir / label.replace('.', '/')).with_suffix('.py')) + path = (tmp_dir / label.replace('.', '/')).with_suffix('.py') + if path.exists(): + paths.append(path) fixup_entry_func_import_name( paths=paths, source_stage_entrypoint=entrypoint, target_stage_entrypoint=new_entrypoint, source_stage_package=package, - replace_package=bool(len(inner_func_outer_callers)) + replace_package=flg_replace_pkg, + logger=logger, ) if verbose: @@ -467,7 +478,14 @@ def patch( logger.info('Code diff:') diff_log_str = StringIO() pretty_print_diff(diffs, file=diff_log_str) - pipepager(diff_log_str.getvalue(), cmd='less -R') + + # Print to pager, overwirte the default pager to support color output + pager = getpager() + # pager is a `less` called function + pager_code = inspect.getsource(pager).strip() + if pager_code == 'return lambda text: pipepager(text, \'less\')': + pager = lambda text: pipepager(text, 'less -R') + pager(diff_log_str.getvalue()) # Clean up git if not git_exists: diff --git a/dataci/plugins/orchestrator/airflow.py b/dataci/plugins/orchestrator/airflow.py index e7c08d4..d3331e4 100644 --- a/dataci/plugins/orchestrator/airflow.py +++ b/dataci/plugins/orchestrator/airflow.py @@ -273,7 +273,7 @@ def script(self) -> Script: # 2. locate the function definition tree = ast.parse(Path(fileloc).read_text()) func_node = locate_stage_function(tree, self.name)[0] - assert len(func_node) == 1, f'Found multiple function definition for stage {self.name} in {entryfile}' + assert len(func_node) == 1, f'Found {len(func_node)} function definition for stage {self.name} in {entryfile}' self._script = Script( dir=fileloc.parent, entry_path=entryfile, entry_node=func_node[0], local_dir=fileloc.parent, filelist=[fileloc.relative_to(fileloc.parent)], diff --git a/dataci/plugins/orchestrator/script.py b/dataci/plugins/orchestrator/script.py index c8f4f53..34c19cf 100644 --- a/dataci/plugins/orchestrator/script.py +++ b/dataci/plugins/orchestrator/script.py @@ -102,7 +102,7 @@ def locate_stage_function( elif node.name in stage_names: stage_nodes.append(node) deco_nodes.append(decorator) - return stage_nodes, deco_nodes + return stage_nodes, deco_nodes def locate_dag_function( diff --git a/dataci/utils.py b/dataci/utils.py index 1160648..9dd3d8c 100644 --- a/dataci/utils.py +++ b/dataci/utils.py @@ -7,6 +7,7 @@ """ import hashlib import os +import sys from contextlib import contextmanager from pathlib import Path from typing import TYPE_CHECKING @@ -51,7 +52,8 @@ def hash_file(filepaths: 'Union[str, os.PathLike, List[Union[os.PathLike, str]]] if isinstance(filepaths, (str, os.PathLike)): filepaths = [filepaths] # Find common prefix - root = os.path.commonpath(filepaths) + root = Path(os.path.commonpath(filepaths)) + root = root.parent if root.is_file() else root # Tree scan of all file paths / directories paths = list() for path in filepaths: @@ -93,3 +95,14 @@ def hash_binary(b: bytes): sha_hash.update(b) return sha_hash.hexdigest() + + +if sys.version_info < (3, 9): + def removeprefix(text, prefix): + return text[text.startswith(prefix) and len(prefix):] + + def removesuffix(text, suffix): + return text[:len(text) - len(suffix)] if text.endswith(suffix) else text +else: + removeprefix = str.removeprefix + removesuffix = str.removesuffix diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/base.py b/tests/base.py new file mode 100644 index 0000000..817733a --- /dev/null +++ b/tests/base.py @@ -0,0 +1,21 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Author: Li Yuanming +Email: yuanmingleee@gmail.com +Date: Oct 13, 2023 +""" +import abc +import inspect +import unittest + + +class AbstractTestCase(unittest.TestCase, abc.ABC): + + def __new__(cls, *args, **kwargs): + """Prevent the abstract class from running the test cases.""" + if inspect.isabstract(cls): + self = unittest.TestCase() + self.runTest = unittest.skip(f"Skip the abstract test case {cls.__name__}.")(lambda: None) + return self + return super().__new__(cls) diff --git a/tests/test_workflow_patch.py b/tests/test_workflow_patch.py new file mode 100644 index 0000000..544df21 --- /dev/null +++ b/tests/test_workflow_patch.py @@ -0,0 +1,293 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Author: Li Yuanming +Email: yuanmingleee@gmail.com +Date: Oct 12, 2023 +""" +import abc +import unittest +from pathlib import Path +from typing import TYPE_CHECKING + +from dataci.utils import cwd +from tests.base import AbstractTestCase + +if TYPE_CHECKING: + from dataci.models import Workflow + +TEST_DIR = Path(__file__).parent + + +class TestWorkflowPatchSingleFilePipelineBase(AbstractTestCase, abc.ABC): + def setUp(self): + """Set up test fixtures. + 1. Create a test workspace and set it as the current default workspace + 2. Save and publish a test dataset + 3. Save the test pipeline + """ + from dataci.models import workspace + from dataci.models import Dataset + + workspace.DEFAULT_WORKSPACE = 'test' + self.test_dataset = Dataset('yelp_review_test', dataset_files=[ + {'date': '2020-10-05 00:44:08', 'review_id': 'HWRpzNHPqjA4pxN5863QUA', 'stars': 5.0, + 'text': "I called Anytime on Friday afternoon about the number pad lock on my front door. After several questions, the gentleman asked me if I had changed the battery.", }, + {'date': '2020-10-15 04:34:49', 'review_id': '01plHaNGM92IT0LLcHjovQ', 'stars': 5.0, + 'text': "Friend took me for lunch. Ordered the Chicken Pecan Tart although it was like a piece quiche, was absolutely delicious!", }, + {'date': '2020-10-17 06:58:09', 'review_id': '7CDDSuzoxTr4H5N4lOi9zw', 'stars': 4.0, + 'text': "I love coming here for my fruit and vegetables. It is always fresh and a great variety. The bags of already diced veggies are a huge time saver.", }, + ]) + self.assertEqual( + self.test_dataset.workspace.name, + 'test', + "Failed to set the default workspace to `test`." + ) + self.test_dataset.publish(version_tag='2020-10') + self._workflow = None + self.workflow.save() + + @property + @abc.abstractmethod + def workflow(self) -> 'Workflow': + pass + + def test_patch_standalone_stage(self): + from tests.workflow_patch.single_file_stage_v2 import single_file_stage_v2 + + # Patch the standalone stage + new_workflow = self.workflow.patch(step0_standalone_stage=single_file_stage_v2) + # Check if the pipeline is patched + self.assertIn( + single_file_stage_v2.full_name, + new_workflow.stages.keys(), + "Failed to patch the `step0_standalone_stage` stage." + ) + new_workflow.test() + + def test_patch_intra_deps_stage(self): + from tests.workflow_patch.single_file_stage_v2 import single_file_stage_v2 + + # Patch the standalone stage + new_workflow = self.workflow.patch(step1_intra_deps_stage=single_file_stage_v2) + # Check if the pipeline is patched + self.assertIn( + single_file_stage_v2.full_name, + new_workflow.stages.keys(), + "Failed to patch the `step1_intra_deps_stage` stage." + ) + new_workflow.test() + + def test_patch_unused_stage(self): + from tests.workflow_patch.single_file_stage_v2 import single_file_stage_v2 + + # Check if the pipeline is patched + with self.assertRaises(ValueError) as context: + # Patch the standalone stage + new_workflow = self.workflow.patch(unused_stage=single_file_stage_v2) + self.assertIn( + f'Cannot find stage name=unused_stage in workflow', + context.exception.args[0], + "Failed to raise exception for patching `unused_stage` stage." + ) + + +class TestWorkflowPatchSingleFilePipelineNormal(TestWorkflowPatchSingleFilePipelineBase): + @property + def workflow(self): + if self._workflow is None: + from tests.workflow_patch.single_file_pipeline.normal_pipeline import text_process_ci_pipeline + + self._workflow = text_process_ci_pipeline + return self._workflow + + +@unittest.skip("Skip the test cases for local defined stages.") +class TestWorkflowPatchSingleFilePipelineLocalDefStage( + TestWorkflowPatchSingleFilePipelineBase +): + @property + def workflow(self): + if self._workflow is None: + from tests.workflow_patch.single_file_pipeline.local_def_stage_pipeline import text_process_ci_pipeline + + self._workflow = text_process_ci_pipeline + return self._workflow + + +class TestWorkflowPatchMultiFilePipelineBase(AbstractTestCase, abc.ABC): + """Test cases for patching the multi-file pipeline.""" + + def setUp(self): + """Set up test fixtures. + 1. Create a test workspace and set it as the current default workspace + 2. Save and publish a test dataset + 3. Save the test pipeline + """ + from dataci.models import workspace + from dataci.models import Dataset + + workspace.DEFAULT_WORKSPACE = 'test' + self.test_dataset = Dataset('yelp_review_test', dataset_files=[ + {'date': '2020-10-05 00:44:08', 'review_id': 'HWRpzNHPqjA4pxN5863QUA', 'stars': 5.0, + 'text': "I called Anytime on Friday afternoon about the number pad lock on my front door. After several questions, the gentleman asked me if I had changed the battery.", }, + {'date': '2020-10-15 04:34:49', 'review_id': '01plHaNGM92IT0LLcHjovQ', 'stars': 5.0, + 'text': "Friend took me for lunch. Ordered the Chicken Pecan Tart although it was like a piece quiche, was absolutely delicious!", }, + {'date': '2020-10-17 06:58:09', 'review_id': '7CDDSuzoxTr4H5N4lOi9zw', 'stars': 4.0, + 'text': "I love coming here for my fruit and vegetables. It is always fresh and a great variety. The bags of already diced veggies are a huge time saver.", }, + ]) + self.assertEqual( + self.test_dataset.workspace.name, + 'test', + "Failed to set the default workspace to `test`." + ) + self.test_dataset.publish(version_tag='2020-10') + self._workflow = None + self.workflow.save() + + from tests.workflow_patch.single_file_stage_v2 import single_file_stage_v2 + self.new_stage = single_file_stage_v2 + + @property + @abc.abstractmethod + def workflow(self) -> 'Workflow': + pass + + def test_patch_unused_stage(self): + # Check if the pipeline is patched + with self.assertRaises(ValueError) as context: + # Patch the standalone stage + new_workflow = self.workflow.patch(unused_stage=self.new_stage) + self.assertIn( + f'Cannot find stage name=unused_stage in workflow', + context.exception.args[0], + "Failed to raise exception for patching `unused_stage` stage." + ) + + def test_unused_stage_w_util(self): + # Check if the pipeline is patched + with self.assertRaises(ValueError) as context: + # Patch the standalone stage + new_workflow = self.workflow.patch(unused_stage_w_util=self.new_stage) + self.assertIn( + f'Cannot find stage name=unused_stage_w_util in workflow', + context.exception.args[0], + "Failed to raise exception for patching `unused_stage_w_util` stage." + ) + + def test_unused_stage_w_util2(self): + # Check if the pipeline is patched + with self.assertRaises(ValueError) as context: + # Patch the standalone stage + new_workflow = self.workflow.patch(unused_stage_w_util2=self.new_stage) + self.assertIn( + f'Cannot find stage name=unused_stage_w_util2 in workflow', + context.exception.args[0], + "Failed to raise exception for patching `unused_stage_w_util2` stage." + ) + + def test_used_stage(self): + # Patch the standalone stage + new_workflow = self.workflow.patch(used_stage=self.new_stage) + # Check if the pipeline is patched + self.assertIn( + self.new_stage.full_name, + new_workflow.stages.keys(), + "Failed to patch the `used_stage` stage." + ) + new_workflow.test() + + def test_used_stage_w_util(self): + # Patch the standalone stage + new_workflow = self.workflow.patch(used_stage_w_util=self.new_stage) + # Check if the pipeline is patched + self.assertIn( + self.new_stage.full_name, + new_workflow.stages.keys(), + "Failed to patch the `used_stage_w_util` stage." + ) + new_workflow.test() + + def test_multi_file_stage(self): + # Patch the standalone stage + new_workflow = self.workflow.patch(multi_file_stage=self.new_stage) + # Check if the pipeline is patched + self.assertIn( + self.new_stage.full_name, + new_workflow.stages.keys(), + "Failed to patch the `multi_file_stage` stage." + ) + new_workflow.test() + + def test_multi_file_stage_w_util(self): + # Patch the standalone stage + new_workflow = self.workflow.patch(multi_file_stage_w_util=self.new_stage) + # Check if the pipeline is patched + self.assertIn( + self.new_stage.full_name, + new_workflow.stages.keys(), + "Failed to patch the `multi_file_stage_w_util` stage." + ) + new_workflow.test() + + +class TestWorkflowPatchMultiFilePipelineNormal(TestWorkflowPatchMultiFilePipelineBase): + @property + def workflow(self): + if self._workflow is None: + from dataci.models import Workflow + self._workflow = Workflow.from_path( + TEST_DIR / 'workflow_patch/multi_file_pipeline', + entry_path='normal_import_pipeline.py' + ) + return self._workflow + + +class TestWorkflowPatchMultiFilePipelineImportAlias(TestWorkflowPatchMultiFilePipelineBase): + @property + def workflow(self): + if self._workflow is None: + from dataci.models import Workflow + self._workflow = Workflow.from_path( + TEST_DIR / 'workflow_patch/multi_file_pipeline', + entry_path='import_alias_pipeline.py' + ) + return self._workflow + + +@unittest.skip("Skip the test cases for import and assign to var, not working now.") +class TestWorkflowPatchMultiFilePipelineImportAndAssignToVar(TestWorkflowPatchMultiFilePipelineBase): + @property + def workflow(self): + if self._workflow is None: + from dataci.models import Workflow + self._workflow = Workflow.from_path( + TEST_DIR / 'workflow_patch/multi_file_pipeline', + entry_path='import_and_assign_to_var_pipeline.py' + ) + return self._workflow + + +@unittest.skip("Skip the test cases for local import, not working now.") +class TestWorkflowPatchMultiFilePipelineLocalImport(TestWorkflowPatchMultiFilePipelineBase): + @property + def workflow(self): + if self._workflow is None: + from dataci.models import Workflow + self._workflow = Workflow.from_path( + TEST_DIR / 'workflow_patch/multi_file_pipeline', + entry_path='local_import_pipeline.py' + ) + return self._workflow + + +class TestWorkflowPatchMultiFilePipelineMultilineImport(TestWorkflowPatchMultiFilePipelineBase): + @property + def workflow(self): + if self._workflow is None: + from dataci.models import Workflow + self._workflow = Workflow.from_path( + TEST_DIR / 'workflow_patch/multi_file_pipeline', + entry_path='multiline_import_pipeline.py' + ) + return self._workflow diff --git a/tests/workflow_patch/multi_file_pipeline/import_alias_pipeline.py b/tests/workflow_patch/multi_file_pipeline/import_alias_pipeline.py new file mode 100644 index 0000000..5404975 --- /dev/null +++ b/tests/workflow_patch/multi_file_pipeline/import_alias_pipeline.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Author: Li Yuanming +Email: yuanmingleee@gmail.com +Date: Aug 22, 2023 +""" +from datetime import datetime +from dataci.plugins.decorators import dag, Dataset + + +# from stage_mod import func_name as func_name_alias +from step03_used_stage import used_stage as step03_stage +# import stage_mode, use as: stage_mod.func_name +import step04_used_stage_w_util as step04_mod + +import step05_multi_file_stage_w_util.multi_file_stage_w_util as step05_mod +import step06_multi_file_stage.multi_file_stage as step06_mod + +from utils import common_util + + +@dag( + start_date=datetime(2020, 7, 30), schedule=None, +) +def import_alias_pipeline(): + common_util() + raw_dataset_train = Dataset.get('yelp_review@latest') + df = step03_stage(raw_dataset_train) + df = step04_mod.used_stage_w_util(df) + df = step05_mod.multi_file_stage_w_util(df) + df = step06_mod.multi_file_stage(df) + Dataset(name='text_aug', dataset_files=df) + + +# Build the pipeline +import_alias_dag = import_alias_pipeline() diff --git a/tests/workflow_patch/multi_file_pipeline/import_and_assign_to_var_pipeline.py b/tests/workflow_patch/multi_file_pipeline/import_and_assign_to_var_pipeline.py new file mode 100644 index 0000000..e402600 --- /dev/null +++ b/tests/workflow_patch/multi_file_pipeline/import_and_assign_to_var_pipeline.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Author: Li Yuanming +Email: yuanmingleee@gmail.com +Date: Aug 22, 2023 +""" +from datetime import datetime +from dataci.plugins.decorators import dag, Dataset + + +import step03_used_stage +import step04_used_stage_w_util +import step05_multi_file_stage_w_util.multi_file_stage_w_util +import step06_multi_file_stage.multi_file_stage +from utils import common_util + +import types + +packages = types.ModuleType('packages') +packages.step1_stage = step03_used_stage.used_stage +packages.step2_stage = step04_used_stage_w_util.used_stage_w_util +packages.step3_stage = step05_multi_file_stage_w_util.multi_file_stage_w_util.multi_file_stage_w_util +packages.step4_stage = step06_multi_file_stage.multi_file_stage.multi_file_stage + + +@dag( + start_date=datetime(2020, 7, 30), schedule=None, +) +def import_and_assign_to_var_pipeline(): + common_util() + raw_dataset_train = Dataset.get('yelp_review@latest') + df = packages.step1_stage(raw_dataset_train) + df = packages.step2_stage(df) + df = packages.step3_stage(df) + df = packages.step4_stage(df) + Dataset(name='text_aug', dataset_files=df) + + +# Build the pipeline +import_and_assign_to_var_dag = import_and_assign_to_var_pipeline() diff --git a/tests/workflow_patch/multi_file_pipeline/local_import_pipeline.py b/tests/workflow_patch/multi_file_pipeline/local_import_pipeline.py new file mode 100644 index 0000000..2dcaa79 --- /dev/null +++ b/tests/workflow_patch/multi_file_pipeline/local_import_pipeline.py @@ -0,0 +1,32 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Author: Li Yuanming +Email: yuanmingleee@gmail.com +Date: Aug 22, 2023 +""" +from datetime import datetime +from dataci.plugins.decorators import dag, Dataset + + +@dag( + start_date=datetime(2020, 7, 30), schedule=None, +) +def local_import_pipeline(): + from step03_used_stage import used_stage + from step04_used_stage_w_util import used_stage_w_util + from step05_multi_file_stage_w_util.multi_file_stage_w_util import multi_file_stage_w_util + from step06_multi_file_stage.multi_file_stage import multi_file_stage + from utils import common_util + + common_util() + raw_dataset_train = Dataset.get('test_yelp_review@latest') + df = used_stage(raw_dataset_train) + df = used_stage_w_util(df) + df = multi_file_stage_w_util(df) + df = multi_file_stage(df) + Dataset(name='test_text_aug', dataset_files=df) + + +# Build the pipeline +local_import_dag = local_import_pipeline() diff --git a/tests/workflow_patch/multi_file_pipeline/multiline_import_pipeline.py b/tests/workflow_patch/multi_file_pipeline/multiline_import_pipeline.py new file mode 100644 index 0000000..9eff561 --- /dev/null +++ b/tests/workflow_patch/multi_file_pipeline/multiline_import_pipeline.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Author: Li Yuanming +Email: yuanmingleee@gmail.com +Date: Aug 22, 2023 +""" +from datetime import datetime +from dataci.plugins.decorators import dag, Dataset + + +from step03_used_stage import used_stage; from step04_used_stage_w_util import used_stage_w_util; +from step03_used_stage import used_stage; from \ + step04_used_stage_w_util import \ + used_stage_w_util +from step05_multi_file_stage_w_util import ( + multi_file_stage_w_util, + utils +) +from step05_multi_file_stage_w_util import multi_file_stage_w_util, utils +from step05_multi_file_stage_w_util import multi_file_stage_w_util, \ + utils +from step06_multi_file_stage.multi_file_stage import ( + multi_file_stage, +) +from utils import common_util + + +@dag( + start_date=datetime(2020, 7, 30), schedule=None, +) +def multiline_import_pipeline(): + common_util() + raw_dataset_train = Dataset.get('yelp_review@latest') + df = used_stage(raw_dataset_train) + df = used_stage_w_util(df) + df = multi_file_stage_w_util.multi_file_stage_w_util(df) + df = multi_file_stage(df) + Dataset(name='text_aug', dataset_files=df) + + +# Build the pipeline +multiline_import_dag = multiline_import_pipeline() diff --git a/tests/workflow_patch/multi_file_pipeline/normal_import_pipeline.py b/tests/workflow_patch/multi_file_pipeline/normal_import_pipeline.py new file mode 100644 index 0000000..e9ac608 --- /dev/null +++ b/tests/workflow_patch/multi_file_pipeline/normal_import_pipeline.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Author: Li Yuanming +Email: yuanmingleee@gmail.com +Date: Aug 22, 2023 +""" +from datetime import datetime +from dataci.plugins.decorators import dag, Dataset + + +from step03_used_stage import used_stage +from step04_used_stage_w_util import used_stage_w_util +from step05_multi_file_stage_w_util.multi_file_stage_w_util import multi_file_stage_w_util +from step06_multi_file_stage.multi_file_stage import multi_file_stage +from utils import common_util + + +@dag( + start_date=datetime(2020, 7, 30), schedule=None, +) +def normal_import_pipeline(): + common_util() + raw_dataset_train = Dataset.get('yelp_review@latest') + df = used_stage(raw_dataset_train) + df = used_stage_w_util(df) + df = multi_file_stage_w_util(df) + df = multi_file_stage(df) + Dataset(name='text_aug', dataset_files=df) + + +# Build the pipeline +normal_import_dag = normal_import_pipeline() diff --git a/tests/workflow_patch/multi_file_pipeline/step00_unused_stage.py b/tests/workflow_patch/multi_file_pipeline/step00_unused_stage.py new file mode 100644 index 0000000..f690b43 --- /dev/null +++ b/tests/workflow_patch/multi_file_pipeline/step00_unused_stage.py @@ -0,0 +1,15 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Author: Li Yuanming +Email: yuanmingleee@gmail.com +Date: Oct 11, 2023 +""" +from dataci.plugins.decorators import stage +from step02_with_util_stage import in_stage_util_function + +@stage +def unused_stage(df): + in_stage_util_function() + # random data selection + return df.sample(frac=1) \ No newline at end of file diff --git a/tests/workflow_patch/multi_file_pipeline/step01_unused_stage_w_util.py b/tests/workflow_patch/multi_file_pipeline/step01_unused_stage_w_util.py new file mode 100644 index 0000000..0df57fe --- /dev/null +++ b/tests/workflow_patch/multi_file_pipeline/step01_unused_stage_w_util.py @@ -0,0 +1,18 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Author: Li Yuanming +Email: yuanmingleee@gmail.com +Date: Oct 11, 2023 +""" +from dataci.plugins.decorators import stage + + +@stage +def unused_stage_w_util(df): + # random data selection + return df.sample(frac=1) + + +def outer_used_util(): + pass diff --git a/tests/workflow_patch/multi_file_pipeline/step02_unused_stage_w_util2.py b/tests/workflow_patch/multi_file_pipeline/step02_unused_stage_w_util2.py new file mode 100644 index 0000000..0892cba --- /dev/null +++ b/tests/workflow_patch/multi_file_pipeline/step02_unused_stage_w_util2.py @@ -0,0 +1,21 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Author: Li Yuanming +Email: yuanmingleee@gmail.com +Date: Oct 11, 2023 +""" +from dataci.plugins.decorators import stage + +import augly.text as textaugs + + +def outer_used_util2(): + # use inner stage + print(unused_stage_w_util2) + + +@stage +def unused_stage_w_util2(df): + # random data selection + return df.sample(frac=1) \ No newline at end of file diff --git a/tests/workflow_patch/multi_file_pipeline/step03_used_stage.py b/tests/workflow_patch/multi_file_pipeline/step03_used_stage.py new file mode 100644 index 0000000..3c37374 --- /dev/null +++ b/tests/workflow_patch/multi_file_pipeline/step03_used_stage.py @@ -0,0 +1,14 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Author: Li Yuanming +Email: yuanmingleee@gmail.com +Date: Oct 11, 2023 +""" +from dataci.plugins.decorators import stage + + +@stage +def used_stage(df): + # random data selection + return df.sample(frac=1) diff --git a/tests/workflow_patch/multi_file_pipeline/step04_used_stage_w_util.py b/tests/workflow_patch/multi_file_pipeline/step04_used_stage_w_util.py new file mode 100644 index 0000000..9054ac8 --- /dev/null +++ b/tests/workflow_patch/multi_file_pipeline/step04_used_stage_w_util.py @@ -0,0 +1,19 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Author: Li Yuanming +Email: yuanmingleee@gmail.com +Date: Oct 11, 2023 +""" +from dataci.plugins.decorators import stage + + +@stage +def used_stage_w_util(df): + # random data selection + return df.sample(frac=1) + + +def outer_used_util(): + # use inner stage + print(used_stage_w_util) diff --git a/tests/workflow_patch/multi_file_pipeline/step05_multi_file_stage_w_util/__init__.py b/tests/workflow_patch/multi_file_pipeline/step05_multi_file_stage_w_util/__init__.py new file mode 100644 index 0000000..3163bd0 --- /dev/null +++ b/tests/workflow_patch/multi_file_pipeline/step05_multi_file_stage_w_util/__init__.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Author: Li Yuanming +Email: yuanmingleee@gmail.com +Date: Oct 11, 2023 +""" \ No newline at end of file diff --git a/tests/workflow_patch/multi_file_pipeline/step05_multi_file_stage_w_util/multi_file_stage_w_util.py b/tests/workflow_patch/multi_file_pipeline/step05_multi_file_stage_w_util/multi_file_stage_w_util.py new file mode 100644 index 0000000..516c844 --- /dev/null +++ b/tests/workflow_patch/multi_file_pipeline/step05_multi_file_stage_w_util/multi_file_stage_w_util.py @@ -0,0 +1,13 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Author: Li Yuanming +Email: yuanmingleee@gmail.com +Date: Oct 11, 2023 +""" +from dataci.plugins.decorators import stage + + +@stage +def multi_file_stage_w_util(df): + return df diff --git a/tests/workflow_patch/multi_file_pipeline/step05_multi_file_stage_w_util/utils.py b/tests/workflow_patch/multi_file_pipeline/step05_multi_file_stage_w_util/utils.py new file mode 100644 index 0000000..27f9ca5 --- /dev/null +++ b/tests/workflow_patch/multi_file_pipeline/step05_multi_file_stage_w_util/utils.py @@ -0,0 +1,9 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Author: Li Yuanming +Email: yuanmingleee@gmail.com +Date: Oct 11, 2023 +""" +def some_util_function(): + pass diff --git a/tests/workflow_patch/multi_file_pipeline/step06_multi_file_stage/__init__.py b/tests/workflow_patch/multi_file_pipeline/step06_multi_file_stage/__init__.py new file mode 100644 index 0000000..3163bd0 --- /dev/null +++ b/tests/workflow_patch/multi_file_pipeline/step06_multi_file_stage/__init__.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Author: Li Yuanming +Email: yuanmingleee@gmail.com +Date: Oct 11, 2023 +""" \ No newline at end of file diff --git a/tests/workflow_patch/multi_file_pipeline/step06_multi_file_stage/multi_file_stage.py b/tests/workflow_patch/multi_file_pipeline/step06_multi_file_stage/multi_file_stage.py new file mode 100644 index 0000000..e3557b8 --- /dev/null +++ b/tests/workflow_patch/multi_file_pipeline/step06_multi_file_stage/multi_file_stage.py @@ -0,0 +1,14 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Author: Li Yuanming +Email: yuanmingleee@gmail.com +Date: Oct 11, 2023 +""" +from dataci.plugins.decorators import stage + + +@stage +def multi_file_stage(df): + # random data selection + return df.sample(frac=1) diff --git a/tests/workflow_patch/multi_file_pipeline/utils.py b/tests/workflow_patch/multi_file_pipeline/utils.py new file mode 100644 index 0000000..8160e42 --- /dev/null +++ b/tests/workflow_patch/multi_file_pipeline/utils.py @@ -0,0 +1,15 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Author: Li Yuanming +Email: yuanmingleee@gmail.com +Date: Oct 11, 2023 +""" +from step01_unused_stage_w_util import outer_used_util +from step04_used_stage_w_util import outer_used_util as outer_used_util2 +from step05_multi_file_stage_w_util.utils import some_util_function + +def common_util(): + outer_used_util() + outer_used_util2() + some_util_function() diff --git a/tests/workflow_patch/single_file_pipeline/README.md b/tests/workflow_patch/single_file_pipeline/README.md new file mode 100644 index 0000000..596f927 --- /dev/null +++ b/tests/workflow_patch/single_file_pipeline/README.md @@ -0,0 +1 @@ +This is a README file for the singlefile_stage test. \ No newline at end of file diff --git a/tests/workflow_patch/single_file_pipeline/local_def_stage_pipeline.py b/tests/workflow_patch/single_file_pipeline/local_def_stage_pipeline.py new file mode 100644 index 0000000..0f29c3c --- /dev/null +++ b/tests/workflow_patch/single_file_pipeline/local_def_stage_pipeline.py @@ -0,0 +1,48 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Author: Li Yuanming +Email: yuanmingleee@gmail.com +Date: Aug 22, 2023 +""" +import augly.text as textaugs + +from dataci.plugins.decorators import stage + + +def common_util_function(): + # do nothing here + pass + + +from datetime import datetime +from dataci.plugins.decorators import dag, Dataset + + +@dag( + start_date=datetime(2020, 7, 30), schedule=None, +) +def text_process_ci(): + @stage + def unused_stage(df): + common_util_function() + return df + + @stage + def step0_standalone_stage(df): + return df.sample(frac=1) + + @stage + def step1_intra_deps_stage(df): + common_util_function() + aug_function = textaugs.ReplaceSimilarUnicodeChars() + df['text'] = aug_function(df['text'].tolist()) + return df + + raw_dataset_train = Dataset.get('yelp_review@latest') + df = step0_standalone_stage(raw_dataset_train) + step1_intra_deps_stage(df) + + +# Build the pipeline +text_process_ci_pipeline = text_process_ci() diff --git a/tests/workflow_patch/single_file_pipeline/normal_pipeline.py b/tests/workflow_patch/single_file_pipeline/normal_pipeline.py new file mode 100644 index 0000000..e3cec54 --- /dev/null +++ b/tests/workflow_patch/single_file_pipeline/normal_pipeline.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Author: Li Yuanming +Email: yuanmingleee@gmail.com +Date: Aug 22, 2023 +""" +import augly.text as textaugs + +from dataci.plugins.decorators import stage + + +def common_util_function(): + # do nothing here + pass + + +@stage +def unused_stage(df): + common_util_function() + return df + + +@stage +def step0_standalone_stage(df): + return df.sample(frac=1) + + +@stage +def step1_intra_deps_stage(df): + common_util_function() + aug_function = textaugs.ReplaceSimilarUnicodeChars() + df['text'] = aug_function(df['text'].tolist()) + return df + + +from datetime import datetime +from dataci.plugins.decorators import dag, Dataset + + +@dag( + start_date=datetime(2020, 7, 30), schedule=None, +) +def text_process_ci(): + raw_dataset_train = Dataset.get('yelp_review@latest') + df = step0_standalone_stage(raw_dataset_train) + step1_intra_deps_stage(df) + + +# Build the pipeline +text_process_ci_pipeline = text_process_ci() diff --git a/tests/workflow_patch/single_file_stage_v2.py b/tests/workflow_patch/single_file_stage_v2.py new file mode 100644 index 0000000..2dff610 --- /dev/null +++ b/tests/workflow_patch/single_file_stage_v2.py @@ -0,0 +1,15 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Author: Li Yuanming +Email: yuanmingleee@gmail.com +Date: Oct 11, 2023 +""" +from dataci.plugins.decorators import stage + + +@stage +def single_file_stage_v2(df): + """A single file stage v2 to patch""" + # random data selection + return df.sample(frac=1)