In [7]:
!unzip kfp_eval_samples.zip -d /kaggle/input/files-kfp/kfp_eval_samples

unzip:  cannot find or open kfp_eval_samples.zip, kfp_eval_samples.zip.zip or kfp_eval_samples.zip.ZIP.


In [19]:
!pip install -q kfp


In [20]:
from pathlib import Path

output_dir = Path("/kaggle/input/grok-kfp-data/kfp_eval_samples")
file_count = len(list(output_dir.glob("*")))  # Counts all files
print(f"üìÅ Total files in directory: {file_count}")


üìÅ Total files in directory: 9


In [21]:
!pip install pytest




In [25]:
import importlib.util
import traceback
import logging
import os
from pathlib import Path
from typing import Callable, Optional
import pytest
from kfp import dsl, compiler

# --- Logging setup ---
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# --- Directory with generated .py files ---
PIPELINE_DIR = Path("/kaggle/input/enhanced-data-cht/kfp_eval_samples_2")

# Validate directory existence
if not PIPELINE_DIR.exists():
    raise FileNotFoundError(f"Pipeline directory {PIPELINE_DIR} does not exist")

pipeline_files = list(PIPELINE_DIR.glob("*.py"))
if not pipeline_files:
    raise FileNotFoundError(f"No .py files found in {PIPELINE_DIR}")


# === Function to load @dsl.pipeline ===
def get_pipeline_func(py_file: Path) -> Optional[Callable]:
    """Load a pipeline function decorated with @dsl.pipeline."""
    try:
        spec = importlib.util.spec_from_file_location(py_file.stem, py_file)
        module = importlib.util.module_from_spec(spec)
        spec.loader.exec_module(module)

        funcs = [
            obj for obj in vars(module).values()
            if callable(obj) and hasattr(obj, "_component_spec")  # <- this marks @dsl.pipeline
        ]
        return funcs[0] if funcs else None
    except Exception as e:
        logger.error(f"Error loading {py_file.name}: {e}")
        traceback.print_exc()
        return None


# === Function to validate presence of @component decorators ===
def validate_component_decorators(module) -> bool:
    """Check if module contains at least one valid @component-decorated function."""
    try:
        components = [
            obj for obj in vars(module).values()
            if callable(obj) and hasattr(obj, "_component_human_name")
        ]
        if not components:
            logger.warning(f"‚ö†Ô∏è No @component found in {getattr(module, '__file__', 'unknown module')}")
        return True
    except Exception as e:
        logger.error(f"Error validating components: {e}")
        return False


# === Optional check for external dependencies ===
def check_external_dependencies(py_file: Path) -> bool:
    """Detect hardcoded cloud dependencies in the code."""
    with open(py_file, 'r') as f:
        content = f.read()
    keywords = ['snowflake', 'google.cloud', 'bigquery', 'gs://']
    return any(kw in content.lower() for kw in keywords)


# === TEST 1: Pipeline Compilation ===
@pytest.mark.timeout(60)
@pytest.mark.parametrize("py_file", pipeline_files)
def test_pipeline_compilation(py_file: Path) -> None:
    """Test that the pipeline compiles successfully to YAML."""
    pipeline_func = get_pipeline_func(py_file)
    if pipeline_func is None:
        pytest.skip(f"‚è≠Ô∏è No @dsl.pipeline function found in {py_file.name}")

    # Load module and validate components
    spec = importlib.util.spec_from_file_location(py_file.stem, py_file)
    module = importlib.util.module_from_spec(spec)
    spec.loader.exec_module(module)

    assert validate_component_decorators(module), f"Invalid @component decorators in {py_file.name}"

    if check_external_dependencies(py_file):
        logger.warning(f"‚ö†Ô∏è Skipping {py_file.name} due to cloud dependencies")
        pytest.skip(f"{py_file.name} has external services")

    yaml_path = str(py_file.with_suffix(".yaml"))
    try:
        compiler.Compiler().compile(pipeline_func, yaml_path)
        assert os.path.exists(yaml_path), f"YAML not created for {py_file.name}"
        logger.info(f"‚úÖ Compiled {py_file.name} ‚Üí {yaml_path}")
    except Exception as e:
        logger.error(f"‚ùå Compilation failed: {e}")
        traceback.print_exc()
        pytest.fail(f"Compilation failed for {py_file.name}: {e}")
    finally:
        if os.path.exists(yaml_path):
            os.remove(yaml_path)
            logger.info(f"üßπ Cleaned up {yaml_path}")


# === TEST 2: Parameter Type Validation ===
@pytest.mark.parametrize("py_file", pipeline_files)
def test_pipeline_parameters(py_file: Path) -> None:
    """Validate parameter types for @dsl.pipeline functions."""
    pipeline_func = get_pipeline_func(py_file)
    if pipeline_func is None:
        pytest.skip(f"‚è≠Ô∏è No @dsl.pipeline function found in {py_file.name}")

    import inspect
    sig = inspect.signature(pipeline_func)
    valid_types = (str, int, float, bool, list, dict)

    for param_name, param in sig.parameters.items():
        param_type = param.annotation
        if param_type is inspect.Parameter.empty:
            logger.warning(f"‚ö†Ô∏è Parameter '{param_name}' in {py_file.name} lacks type annotation")
        elif not (param_type in valid_types or isinstance(param_type, type)):
            pytest.fail(f"‚ùå Invalid type '{param_type}' for parameter '{param_name}' in {py_file.name}")
        logger.info(f"‚úÖ {py_file.name}: Validated param '{param_name}' ({param_type})")


In [26]:
!pytest test_kfp_pipelines.py --maxfail=10 --disable-warnings -q


[31mF[0m[31mF[0m[31mF[0m[31mF[0m[31mF[0m[31mF[0m[31mF[0m[31mF[0m[31mF[0m[31mF[0m
[31m[1m_____________________ test_pipeline_compilation[py_file0] ______________________[0m

py_file = PosixPath('/kaggle/input/files-kfp/kfp_eval_samples/muhammadyaseen__kubeflow-on-linode__minio_census_pipeline.py')

    [0m[37m@pytest[39;49;00m.mark.parametrize([33m"[39;49;00m[33mpy_file[39;49;00m[33m"[39;49;00m, pipeline_files)[90m[39;49;00m
    [94mdef[39;49;00m[90m [39;49;00m[92mtest_pipeline_compilation[39;49;00m(py_file):[90m[39;49;00m
        pipeline_func = get_pipeline_func(py_file)[90m[39;49;00m
>       [94massert[39;49;00m pipeline_func [95mis[39;49;00m [95mnot[39;49;00m [94mNone[39;49;00m, [33mf[39;49;00m[33m"[39;49;00m[33mNo @dsl.pipeline function found in [39;49;00m[33m{[39;49;00mpy_file.name[33m}[39;49;00m[33m"[39;49;00m[90m[39;49;00m
[1m[31mE       AssertionError: No @dsl.pipeline function found in muhammadyaseen__kubeflow