Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RayTransformConfiguration to a capture runtime_class #105

Merged
merged 1 commit into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .make.defaults
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ __check_defined = \

.PHONY: .defaults.check.installed
.defaults.check.installed::
if [ ! command -v $(CHECK_RUNNABLE) &>/dev/null ]; then \
@if [ ! command -v $(CHECK_RUNNABLE) &>/dev/null ]; then \
echo $(CHECK_RUNNABLE) must be installed; \
exit 1; \
fi
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from data_processing.launch.ray import DefaultTableTransformRuntimeRay
from data_processing.transform.transform_configuration import (
TransformConfiguration,
TransformConfigurationProxy,
)


class RayTransformConfiguration(TransformConfigurationProxy):
def __init__(
self,
transform_config: TransformConfiguration,
runtime_class: type[DefaultTableTransformRuntimeRay] = DefaultTableTransformRuntimeRay,
):
"""
Initialization
:param transform_class: implementation of the transform
:param runtime_class: implementation of the transform runtime
:param base: base transform configuration class
:param name: transform name
:param remove_from_metadata: list of parameters to remove from metadata
:return:
"""
super().__init__(
proxied_transform_config=transform_config,
)
self.runtime_class = runtime_class

def create_transform_runtime(self) -> DefaultTableTransformRuntimeRay:
"""
Create transform runtime with the parameters captured during apply_input_params()
:return: transform runtime object
"""
return self.runtime_class(self.params)
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@

import ray
from data_processing.data_access import DataAccessFactory, DataAccessFactoryBase
from data_processing.transform import TransformConfiguration
from data_processing.launch.ray import (
DefaultTableTransformRuntimeRay,
RayLauncherConfiguration,
TransformOrchestratorConfiguration,
orchestrate, DefaultTableTransformRuntimeRay,
orchestrate,
)
from data_processing.launch.ray.transform_configuration import RayTransformConfiguration
from data_processing.launch.transform_launcher import AbstractTransformLauncher
from data_processing.transform import TransformConfiguration
from data_processing.utils import get_logger, str2bool


Expand All @@ -36,8 +38,8 @@ class RayTransformLauncher(AbstractTransformLauncher):

def __init__(
self,
transform_config: TransformConfiguration,
runtime_class: type[DefaultTableTransformRuntimeRay]=DefaultTableTransformRuntimeRay,
transform_config: RayTransformConfiguration,
# runtime_class: type[DefaultTableTransformRuntimeRay]=DefaultTableTransformRuntimeRay,
data_access_factory: DataAccessFactoryBase = DataAccessFactory(),
):
"""
Expand All @@ -46,7 +48,7 @@ def __init__(
:param data_access_factory: the factory to create DataAccess instances.
"""
super().__init__(transform_config, data_access_factory)
self.transform_runtime_config = RayLauncherConfiguration(transform_config, runtime_class)
self.transform_runtime_config = RayLauncherConfiguration(transform_config, transform_config.runtime_class)
self.ray_orchestrator = TransformOrchestratorConfiguration(name=self.name)

def __get_parameters(self) -> bool:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
from typing import Any

import pyarrow as pa

from data_processing.launch.pure_python import PythonTransformLauncher
from data_processing.launch.ray.transform_configuration import RayTransformConfiguration
from data_processing.transform import AbstractTableTransform, TransformConfiguration
from data_processing.utils import CLIArgumentProvider, get_logger

Expand Down Expand Up @@ -65,6 +65,7 @@ def transform(self, table: pa.Table) -> tuple[list[pa.Table], dict[str, Any]]:
metadata = {"nfiles": 1, "nrows": len(table)}
return [table], metadata


class NOOPTransformConfiguration(TransformConfiguration):

"""
Expand Down Expand Up @@ -118,6 +119,10 @@ def apply_input_params(self, args: Namespace) -> bool:
return True


class NOOPRayTransformConfiguration(RayTransformConfiguration):
def __init__(self):
super().__init__(NOOPTransformConfiguration())


#
# class NOOPTransformConfigurationRayLauncherConfiguration(RayLauncherConfiguration):
Expand All @@ -141,7 +146,7 @@ def apply_input_params(self, args: Namespace) -> bool:
#

if __name__ == "__main__":
#launcher = PythonTransformLauncher(transform_runtime_config=NOOPPythonLauncherConfigurationPython())
# launcher = PythonTransformLauncher(transform_runtime_config=NOOPPythonLauncherConfigurationPython())
launcher = PythonTransformLauncher(transform_runtime_config=NOOPTransformConfiguration())
logger.info("Launching noop transform")
launcher.launch()
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

import argparse
from argparse import ArgumentParser
from typing import Any

from data_processing.transform import AbstractTableTransform
from data_processing.utils import CLIArgumentProvider
Expand All @@ -21,7 +22,7 @@ class TransformConfiguration(CLIArgumentProvider):
This is a base transform configuration class defining transform's input/output parameter
"""

def __init__(self, name:str, transform_class:AbstractTableTransform, remove_from_metadata: list[str] = []):
def __init__(self, name: str, transform_class: AbstractTableTransform, remove_from_metadata: list[str] = []):
"""
Initialization
"""
Expand All @@ -30,6 +31,29 @@ def __init__(self, name:str, transform_class:AbstractTableTransform, remove_from
self.remove_from_metadata = remove_from_metadata
self.params = {}


class TransformConfigurationProxy(TransformConfiguration):
def __init__(self, proxied_transform_config: TransformConfiguration):
self.proxied_transform_config = proxied_transform_config
# Python probably has a better way of doing this using the proxied transform config
self.name = proxied_transform_config.name
self.transform_class = proxied_transform_config.transform_class
self.remove_from_metadata = proxied_transform_config.remove_from_metadata
self.params = {}

def add_input_params(self, parser: argparse.ArgumentParser) -> None:
self.proxied_transform_config.add_input_params(parser)

def apply_input_params(self, args: argparse.Namespace) -> bool:
is_valid = self.proxied_transform_config.apply_input_params(args)
if is_valid:
self.params = self.proxied_transform_config.params
return is_valid

def get_input_params(self) -> dict[str, Any]:
return self.params


def get_transform_config(
transform_configuration: TransformConfiguration, argv: list[str], parser: ArgumentParser = None
):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
import os
import sys

from data_processing.transform import TransformConfiguration
from data_processing.launch.ray import (
RayTransformLauncher,
from data_processing.launch.ray import RayTransformLauncher
from data_processing.launch.ray.transform_configuration import RayTransformConfiguration
from data_processing.test_support.transform import NOOPTransformConfiguration
from data_processing.test_support.transform.noop_transform import (
NOOPRayTransformConfiguration,
)
from data_processing.transform import AbstractTableTransform
from data_processing.transform import AbstractTableTransform, TransformConfiguration
from data_processing.utils import ParamsUtils


Expand All @@ -43,15 +45,13 @@
code_location = {"github": "github", "commit_hash": "12345", "path": "path"}


class TestingTransformConfiguration(TransformConfiguration):
def __init__(self):
super().__init__("test", transform_class=AbstractTableTransform)
class TestLauncherTransformLauncher(RayTransformLauncher):
"""
Test driver for validation of the functionality
"""

def __init__(self):
super().__init__( TestingTransformConfiguration())
super().__init__(NOOPRayTransformConfiguration())

def _submit_for_execution(self) -> int:
"""
Expand Down Expand Up @@ -89,8 +89,6 @@ def test_launcher():
sys.argv = ParamsUtils.dict_to_req(d=params)
res = TestLauncherTransformLauncher().launch()



assert 0 == res
# Add local config, should fail because now three different configs exist
params["data_local_config"] = ParamsUtils.convert_to_ast(local_conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,15 @@
import os

import pyarrow as pa

from data_processing.launch.ray import RayTransformLauncher
from data_processing.test_support.launch.transform_test import AbstractTransformLauncherTest
from data_processing.test_support.launch.transform_test import (
AbstractTransformLauncherTest,
)
from data_processing.test_support.transform import NOOPTransformConfiguration
from data_processing.test_support.transform.noop_transform import (
NOOPRayTransformConfiguration,
)


table = pa.Table.from_pydict({"name": pa.array(["Tom"]), "age": pa.array([23])})
expected_table = table # We're a noop after all.
Expand All @@ -32,6 +37,6 @@ class TestRayNOOPTransform(AbstractTransformLauncherTest):
def get_test_transform_fixtures(self) -> list[tuple]:
basedir = "../../../../test-data/data_processing/ray/noop/"
basedir = os.path.abspath(os.path.join(os.path.dirname(__file__), basedir))
launcher = RayTransformLauncher(NOOPTransformConfiguration())
launcher = RayTransformLauncher(NOOPRayTransformConfiguration())
fixtures = [(launcher, {"noop_sleep_sec": 0}, basedir + "/input", basedir + "/expected")]
return fixtures
5 changes: 3 additions & 2 deletions transforms/code/code_quality/src/code_quality_local_ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
import sys
from pathlib import Path

from code_quality_transform import CodeQualityRayLauncher
from code_quality_transform import CodeQualityRayTransformConfiguration
from data_processing.launch.ray import RayTransformLauncher
from data_processing.utils import ParamsUtils


Expand All @@ -26,7 +27,7 @@
}

# create launcher
launcher = CodeQualityRayLauncher()
launcher = RayTransformLauncher(CodeQualityRayTransformConfiguration())


worker_options = {"num_cpus": 0.8}
Expand Down
5 changes: 3 additions & 2 deletions transforms/code/code_quality/src/code_quality_s3_ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@

import sys

from code_quality_transform import CodeQualityRayLauncher
from code_quality_transform import CodeQualityRayTransformConfiguration
from data_processing.launch.ray import RayTransformLauncher
from data_processing.utils import ParamsUtils


Expand All @@ -27,7 +28,7 @@
}

# create launcher
launcher = CodeQualityRayLauncher()
launcher = RayTransformLauncher(CodeQualityRayTransformConfiguration())


worker_options = {"num_cpus": 0.8}
Expand Down
32 changes: 6 additions & 26 deletions transforms/code/code_quality/src/code_quality_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
import numpy as np
import pyarrow as pa
from bs4 import BeautifulSoup
from data_processing.transform import TransformConfiguration
from data_processing.launch.ray import RayTransformLauncher
from data_processing.transform import AbstractTableTransform
from data_processing.launch.ray.transform_configuration import RayTransformConfiguration
from data_processing.transform import AbstractTableTransform, TransformConfiguration
from data_processing.utils import TransformUtils
from transformers import AutoTokenizer

Expand Down Expand Up @@ -285,10 +285,7 @@ def transform(self, table: pa.Table) -> tuple[list[pa.Table], dict]:

class CodeQualityTransformConfiguration(TransformConfiguration):
def __init__(self):
super().__init__(
name="code_quality",
transform_class=CodeQualityTransform
)
super().__init__(name="code_quality", transform_class=CodeQualityTransform)

def add_input_params(self, parser: ArgumentParser) -> None:
parser.add_argument(
Expand Down Expand Up @@ -339,28 +336,11 @@ def apply_input_params(self, args: Namespace) -> bool:
return True


# class CodeQualityRayLauncherConfiguration(RayLauncherConfiguration):
# def __init__(self):
# super().__init__(
# name="code_quality",
# transform_class=CodeQualityTransform,
# launcher_configuration=CodeQualityTransformConfiguration(),
# )
#
#
# class CodeQualityPythonLauncherConfiguration(PythonLauncherConfiguration):
# def __init__(self):
# super().__init__(
# name="code_quality",
# transform_class=CodeQualityTransform,
# launcher_configuration=CodeQualityTransformConfiguration(),
# )
# self.base = CodeQualityTransformConfiguration()

class CodeQualityRayLauncher(RayTransformLauncher):
class CodeQualityRayTransformConfiguration(RayTransformConfiguration):
def __init__(self):
super().__init__(transform_config=CodeQualityTransformConfiguration())


if __name__ == "__main__":
launcher = CodeQualityRayLauncher()
launcher = RayTransformLauncher(CodeQualityRayTransformConfiguration())
launcher.launch()
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@

import os

from code_quality_transform import CodeQualityRayLauncher
from data_processing.test_support.launch.transform_test import AbstractTransformLauncherTest
from code_quality_transform import CodeQualityRayTransformConfiguration
from data_processing.launch.ray import RayTransformLauncher
from data_processing.test_support.launch.transform_test import (
AbstractTransformLauncherTest,
)


class TestCodeQualityTransform(AbstractTransformLauncherTest):
Expand All @@ -30,6 +33,6 @@ def get_test_transform_fixtures(self) -> list[tuple]:
}
basedir = "../test-data"
basedir = os.path.abspath(os.path.join(os.path.dirname(__file__), basedir))
launcher = CodeQualityRayLauncher()
launcher = RayTransformLauncher(CodeQualityRayTransformConfiguration())
fixtures = [(launcher, cli, basedir + "/input", basedir + "/expected")]
return fixtures
6 changes: 4 additions & 2 deletions transforms/code/malware/src/malware_local_ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
import sys
from pathlib import Path

from data_processing.launch.ray import RayTransformLauncher
from data_processing.utils import ParamsUtils
from malware_transform import check_clamd, MalwareRayLauncher
from malware_transform import MalwareRayTransformConfiguration, check_clamd


TEST_SOCKET = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".tmp", "clamd.ctl"))
# create parameters
Expand Down Expand Up @@ -51,6 +53,6 @@
# Set the simulated command line args
sys.argv = ParamsUtils.dict_to_req(d=params | malware_params)
# create launcher
launcher = MalwareRayLauncher()
launcher = RayTransformLauncher(MalwareRayTransformConfiguration())
# Launch the ray actor(s) to process the input
launcher.launch()
Loading