Skip to content

Commit

Permalink
bug fixing
Browse files Browse the repository at this point in the history
  • Loading branch information
blublinsky committed Jun 21, 2024
1 parent 468c268 commit b9f3aab
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 28 deletions.
48 changes: 24 additions & 24 deletions transforms/code/inputcode2parquet/kfp_ray/inputcode2parquet_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ def compute_exec_params_func(
runtime_pipeline_id: str,
runtime_job_id: str,
runtime_code_location: str,
code2parquet_supported_langs_file: str,
code2parquet_domain: str,
code2parquet_snapshot: str,
code2parquet_detect_programming_lang: bool,
inputcode2parquet_supported_langs_file: str,
inputcode2parquet_domain: str,
inputcode2parquet_snapshot: str,
inputcode2parquet_detect_programming_lang: bool,
) -> dict:
from runtime_utils import KFPUtils

Expand All @@ -59,10 +59,10 @@ def compute_exec_params_func(
"runtime_pipeline_id": runtime_pipeline_id,
"runtime_job_id": runtime_job_id,
"runtime_code_location": runtime_code_location,
"code2parquet_supported_langs_file": code2parquet_supported_langs_file,
"code2parquet_domain": code2parquet_domain,
"code2parquet_snapshot": code2parquet_snapshot,
"code2parquet_detect_programming_lang": code2parquet_detect_programming_lang,
"inputcode2parquet_supported_langs_file": inputcode2parquet_supported_langs_file,
"inputcode2parquet_domain": inputcode2parquet_domain,
"inputcode2parquet_snapshot": inputcode2parquet_snapshot,
"inputcode2parquet_detect_programming_lang": inputcode2parquet_detect_programming_lang,
}


Expand Down Expand Up @@ -106,7 +106,7 @@ def compute_exec_params_func(
description="Pipeline for converting zip files to parquet",
)
def inputcode2parquet(
ray_name: str = "code2parquet-kfp-ray", # name of Ray cluster
ray_name: str = "inputcode2parquet-kfp-ray", # name of Ray cluster
# Add image_pull_secret and image_pull_policy to ray workers if needed
ray_head_options: str = '{"cpu": 1, "memory": 4, "image": "' + task_image + '" }',
ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, '
Expand All @@ -123,11 +123,11 @@ def inputcode2parquet(
runtime_pipeline_id: str = "pipeline_id",
runtime_code_location: str = "{'github': 'github', 'commit_hash': '12345', 'path': 'path'}",
# code to parquet
code2parquet_supported_langs_file: str = "test/inputcode2parquet/languages/lang_extensions.json",
code2parquet_detect_programming_lang: bool = True,
code2parquet_domain: str = "code",
code2parquet_snapshot: str = "github",
code2parquet_s3_access_secret: str = "s3-secret",
inputcode2parquet_supported_langs_file: str = "test/inputcode2parquet/languages/lang_extensions.json",
inputcode2parquet_detect_programming_lang: bool = True,
inputcode2parquet_domain: str = "code",
inputcode2parquet_snapshot: str = "github",
inputcode2parquet_s3_access_secret: str = "s3-secret",
# additional parameters
additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}',
) -> None:
Expand Down Expand Up @@ -163,11 +163,11 @@ def inputcode2parquet(
:param runtime_actor_options - actor options
:param runtime_pipeline_id - pipeline id
:param runtime_code_location - code location
:param code2parquet_supported_langs_file - file to store allowed languages
:param code2parquet_detect_programming_lang - detect programming language flag
:param code2parquet_domain: domain
:param code2parquet_snapshot: snapshot
:param code2parquet_s3_access_secret - ingest to parquet s3 access secret
:param inputcode2parquet_supported_langs_file - file to store allowed languages
:param inputcode2parquet_detect_programming_lang - detect programming language flag
:param inputcode2parquet_domain: domain
:param inputcode2parquet_snapshot: snapshot
:param inputcode2parquet_s3_access_secret - ingest to parquet s3 access secret
(here we are assuming that select language info is in S3, but potentially in the different bucket)
:return: None
"""
Expand All @@ -187,10 +187,10 @@ def inputcode2parquet(
runtime_pipeline_id=runtime_pipeline_id,
runtime_job_id=run_id,
runtime_code_location=runtime_code_location,
code2parquet_supported_langs_file=code2parquet_supported_langs_file,
code2parquet_domain=code2parquet_domain,
code2parquet_snapshot=code2parquet_snapshot,
code2parquet_detect_programming_lang=code2parquet_detect_programming_lang,
inputcode2parquet_supported_langs_file=inputcode2parquet_supported_langs_file,
inputcode2parquet_domain=inputcode2parquet_domain,
inputcode2parquet_snapshot=inputcode2parquet_snapshot,
inputcode2parquet_detect_programming_lang=inputcode2parquet_detect_programming_lang,
)
ComponentUtils.add_settings_to_component(compute_exec_params, ONE_HOUR_SEC * 2)
# start Ray cluster
Expand All @@ -217,7 +217,7 @@ def inputcode2parquet(
)
ComponentUtils.add_settings_to_component(execute_job, ONE_WEEK_SEC)
ComponentUtils.set_s3_env_vars_to_component(execute_job, data_s3_access_secret)
ComponentUtils.set_s3_env_vars_to_component(execute_job, code2parquet_s3_access_secret, prefix=PREFIX)
ComponentUtils.set_s3_env_vars_to_component(execute_job, inputcode2parquet_s3_access_secret, prefix=PREFIX)
execute_job.after(ray_cluster)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from data_processing.utils import TransformUtils, str2bool


shortname = "code2parquet"
shortname = "inputcode2parquet"
cli_prefix = f"{shortname}_"
supported_langs_file_key = f"{shortname}_supported_langs_file"
supported_languages = f"{shortname}_supported_languages"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class TestIngestToParquetTransform(AbstractBinaryTransformTest):
"""

def get_test_transform_fixtures(self) -> list[tuple]:
basedir = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../ray/test-data"))
basedir = os.path.abspath(os.path.join(os.path.dirname(__file__), "../test-data"))
lang_supported_file = os.path.abspath(os.path.join(basedir, "languages/lang_extensions.json"))
input_dir = os.path.join(basedir, "input")
input_files = get_files_in_folder(input_dir, ".zip")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ def get_test_transform_fixtures(self) -> list[tuple]:
(
PythonTransformLauncher(CodeToParquetPythonConfiguration()),
config,
basedir + "/input",
basedir + "/expected",
os.path.join(basedir, "input"),
os.path.join(basedir, "expected"),
# this is added as a fixture to remove these 2 columns from comparison
["size", "date_acquired"],
)
Expand Down

0 comments on commit b9f3aab

Please sign in to comment.