In [15]:
import kfp
import requests
from urllib.parse import urljoin
import re

#WORKING
HOST='http://hua-kubeflow.ddns.net/'
USERNAME = "user@example.com"
PASSWORD = "2LZHseTdrLFFvx"
NAMESPACE = "kubeflow-user-example-com"

HOST = 'http://huanew-kubeflow.ddns.net/pipeline'
USERNAME = "gfatouros@innov-acts.com"
PASSWORD = "innov@123"
NAMESPACE = "innov"

#  Kubeflow connection settings
KUBEFLOW_HOST = 'http://hua-kubeflow.ddns.net/'
KUBEFLOW_USERNAME = "gfatouros@innov-acts.com"
KUBEFLOW_PASSWORD = "innov@123"
KUBEFLOW_NAMESPACE = "innov"



#print(client.get_recurring_run("2fc815d5-6861-4241-b3d6-5bcdb5445b0b"))

In [14]:
class KFPClientManager:
    """
    A class that creates `kfp.Client` instances with Dex authentication.
    """

    def __init__(
        self,
        api_url: str,
        dex_username: str,
        dex_password: str,
        dex_auth_type: str = "local",
        skip_tls_verify: bool = False,
    ):
        """
        Initialize the KfpClient

        :param api_url: the Kubeflow Pipelines API URL
        :param skip_tls_verify: if True, skip TLS verification
        :param dex_username: the Dex username
        :param dex_password: the Dex password
        :param dex_auth_type: the auth type to use if Dex has multiple enabled, one of: ['ldap', 'local']
        """
        self._api_url = api_url
        self._skip_tls_verify = skip_tls_verify
        self._dex_username = dex_username
        self._dex_password = dex_password
        self._dex_auth_type = dex_auth_type
        self._client = None

        # disable SSL verification, if requested
        if self._skip_tls_verify:
            urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

        # ensure `dex_default_auth_type` is valid
        if self._dex_auth_type not in ["ldap", "local"]:
            raise ValueError(
                f"Invalid `dex_auth_type` '{self._dex_auth_type}', must be one of: ['ldap', 'local']"
            )

    def _get_session_cookies(self) -> str:
        """
        Get the session cookies by authenticating against Dex
        :return: a string of session cookies in the form "key1=value1; key2=value2"
        """

        # use a persistent session (for cookies)
        s = requests.Session()

        # GET the api_url, which should redirect to Dex
        resp = s.get(
            self._api_url, allow_redirects=True, verify=not self._skip_tls_verify
        )
        if resp.status_code == 200:
            pass
        elif resp.status_code == 403:
            # if we get 403, we might be at the oauth2-proxy sign-in page
            # the default path to start the sign-in flow is `/oauth2/start?rd=<url>`
            url_obj = urlsplit(resp.url)
            url_obj = url_obj._replace(
                path="/oauth2/start", query=urlencode({"rd": url_obj.path})
            )
            resp = s.get(
                url_obj.geturl(), allow_redirects=True, verify=not self._skip_tls_verify
            )
        else:
            raise RuntimeError(
                f"HTTP status code '{resp.status_code}' for GET against: {self._api_url}"
            )

        # if we were NOT redirected, then the endpoint is unsecured
        if len(resp.history) == 0:
            # no cookies are needed
            return ""

        # if we are at `../auth` path, we need to select an auth type
        url_obj = urlsplit(resp.url)
        if re.search(r"/auth$", url_obj.path):
            url_obj = url_obj._replace(
                path=re.sub(r"/auth$", f"/auth/{self._dex_auth_type}", url_obj.path)
            )

        # if we are at `../auth/xxxx/login` path, then we are at the login page
        if re.search(r"/auth/.*/login$", url_obj.path):
            dex_login_url = url_obj.geturl()
        else:
            # otherwise, we need to follow a redirect to the login page
            resp = s.get(
                url_obj.geturl(), allow_redirects=True, verify=not self._skip_tls_verify
            )
            if resp.status_code != 200:
                raise RuntimeError(
                    f"HTTP status code '{resp.status_code}' for GET against: {url_obj.geturl()}"
                )
            dex_login_url = resp.url

        # attempt Dex login
        resp = s.post(
            dex_login_url,
            data={"login": self._dex_username, "password": self._dex_password},
            allow_redirects=True,
            verify=not self._skip_tls_verify,
        )
        if resp.status_code != 200:
            raise RuntimeError(
                f"HTTP status code '{resp.status_code}' for POST against: {dex_login_url}"
            )

        # if we were NOT redirected, then the login credentials were probably invalid
        if len(resp.history) == 0:
            raise RuntimeError(
                f"Login credentials are probably invalid - "
                f"No redirect after POST to: {dex_login_url}"
            )

        # if we are at `../approval` path, we need to approve the login
        url_obj = urlsplit(resp.url)
        if re.search(r"/approval$", url_obj.path):
            dex_approval_url = url_obj.geturl()

            # approve the login
            resp = s.post(
                dex_approval_url,
                data={"approval": "approve"},
                allow_redirects=True,
                verify=not self._skip_tls_verify,
            )
            if resp.status_code != 200:
                raise RuntimeError(
                    f"HTTP status code '{resp.status_code}' for POST against: {url_obj.geturl()}"
                )

        return "; ".join([f"{c.name}={c.value}" for c in s.cookies])

    def _create_kfp_client(self) -> kfp.Client:
        try:
            session_cookies = self._get_session_cookies()
        except Exception as ex:
            raise RuntimeError(f"Failed to get Dex session cookies") from ex

        # monkey patch the kfp.Client to support disabling SSL verification
        # kfp only added support in v2: https://github.com/kubeflow/pipelines/pull/7174
        original_load_config = kfp.Client._load_config

        def patched_load_config(client_self, *args, **kwargs):
            config = original_load_config(client_self, *args, **kwargs)
            config.verify_ssl = not self._skip_tls_verify
            return config

        patched_kfp_client = kfp.Client
        patched_kfp_client._load_config = patched_load_config

        return patched_kfp_client(
            host=self._api_url,
            cookies=session_cookies,
        )

    def create_kfp_client(self) -> kfp.Client:
        """Get a newly authenticated Kubeflow Pipelines client."""
        return self._create_kfp_client()
            

In [20]:
kfp_client_manager = KFPClientManager(
    api_url=KUBEFLOW_HOST,
    skip_tls_verify=True,
    dex_username=KUBEFLOW_USERNAME,
    dex_password=KUBEFLOW_PASSWORD,
    # can be 'ldap' or 'local' depending on your Dex configuration
    dex_auth_type="local",
)

# get a newly authenticated KFP client
# TIP: long-lived sessions might need to get a new client when their session expires
kfp_client = kfp_client_manager.create_kfp_client()

# test the client by listing experiments
experiments = kfp_client.list_experiments(namespace=KUBEFLOW_NAMESPACE)
print(experiments)

{'experiments': None, 'next_page_token': None, 'total_size': None}




In [21]:
def get_kubeflow_client() -> kfp.Client:
    kfp_client_manager = KFPClientManager(
        api_url=os.getenv("KUBEFLOW_HOST") + "/pipeline",
        skip_tls_verify=True,

        dex_username=os.getenv("KUBEFLOW_USERNAME"),
        dex_password=os.getenv("KUBEFLOW_PASSWORD"),

        # can be 'ldap' or 'local' depending on your Dex configuration
        dex_auth_type="local",
        )

    kfp_client = kfp_client_manager.create_kfp_client()
    return kfp_client

In [22]:
kf_client = get_kubeflow_client()
pipelines = kf_client.list_pipelines()
print(pipelines)
        


RuntimeError: Failed to get Dex session cookies

In [10]:
from utils.helper_functions import get_kubeflow_client
import os

In [11]:
client = get_kubeflow_client()

RuntimeError: Failed to get Dex session cookies

In [16]:
#from official docs: https://www.kubeflow.org/docs/components/pipelines/user-guides/core-functions/connect-api/
import re
from urllib.parse import urlsplit, urlencode

import kfp
import requests
import urllib3


class KFPClientManager:
    """
    A class that creates `kfp.Client` instances with Dex authentication.
    """

    def __init__(
        self,
        api_url: str,
        dex_username: str,
        dex_password: str,
        dex_auth_type: str = "local",
        skip_tls_verify: bool = False,
    ):
        """
        Initialize the KfpClient

        :param api_url: the Kubeflow Pipelines API URL
        :param skip_tls_verify: if True, skip TLS verification
        :param dex_username: the Dex username
        :param dex_password: the Dex password
        :param dex_auth_type: the auth type to use if Dex has multiple enabled, one of: ['ldap', 'local']
        """
        self._api_url = api_url
        self._skip_tls_verify = skip_tls_verify
        self._dex_username = dex_username
        self._dex_password = dex_password
        self._dex_auth_type = dex_auth_type
        self._client = None

        # disable SSL verification, if requested
        if self._skip_tls_verify:
            urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

        # ensure `dex_default_auth_type` is valid
        if self._dex_auth_type not in ["ldap", "local"]:
            raise ValueError(
                f"Invalid `dex_auth_type` '{self._dex_auth_type}', must be one of: ['ldap', 'local']"
            )

    def _get_session_cookies(self) -> str:
        """
        Get the session cookies by authenticating against Dex
        :return: a string of session cookies in the form "key1=value1; key2=value2"
        """

        # use a persistent session (for cookies)
        s = requests.Session()

        # GET the api_url, which should redirect to Dex
        resp = s.get(
            self._api_url, allow_redirects=True, verify=not self._skip_tls_verify
        )
        if resp.status_code == 200:
            pass
        elif resp.status_code == 403:
            # if we get 403, we might be at the oauth2-proxy sign-in page
            # the default path to start the sign-in flow is `/oauth2/start?rd=<url>`
            url_obj = urlsplit(resp.url)
            url_obj = url_obj._replace(
                path="/oauth2/start", query=urlencode({"rd": url_obj.path})
            )
            resp = s.get(
                url_obj.geturl(), allow_redirects=True, verify=not self._skip_tls_verify
            )
        else:
            raise RuntimeError(
                f"HTTP status code '{resp.status_code}' for GET against: {self._api_url}"
            )

        # if we were NOT redirected, then the endpoint is unsecured
        if len(resp.history) == 0:
            # no cookies are needed
            return ""

        # if we are at `../auth` path, we need to select an auth type
        url_obj = urlsplit(resp.url)
        if re.search(r"/auth$", url_obj.path):
            url_obj = url_obj._replace(
                path=re.sub(r"/auth$", f"/auth/{self._dex_auth_type}", url_obj.path)
            )

        # if we are at `../auth/xxxx/login` path, then we are at the login page
        if re.search(r"/auth/.*/login$", url_obj.path):
            dex_login_url = url_obj.geturl()
        else:
            # otherwise, we need to follow a redirect to the login page
            resp = s.get(
                url_obj.geturl(), allow_redirects=True, verify=not self._skip_tls_verify
            )
            if resp.status_code != 200:
                raise RuntimeError(
                    f"HTTP status code '{resp.status_code}' for GET against: {url_obj.geturl()}"
                )
            dex_login_url = resp.url

        # attempt Dex login
        resp = s.post(
            dex_login_url,
            data={"login": self._dex_username, "password": self._dex_password},
            allow_redirects=True,
            verify=not self._skip_tls_verify,
        )
        if resp.status_code != 200:
            raise RuntimeError(
                f"HTTP status code '{resp.status_code}' for POST against: {dex_login_url}"
            )

        # if we were NOT redirected, then the login credentials were probably invalid
        if len(resp.history) == 0:
            raise RuntimeError(
                f"Login credentials are probably invalid - "
                f"No redirect after POST to: {dex_login_url}"
            )

        # if we are at `../approval` path, we need to approve the login
        url_obj = urlsplit(resp.url)
        if re.search(r"/approval$", url_obj.path):
            dex_approval_url = url_obj.geturl()

            # approve the login
            resp = s.post(
                dex_approval_url,
                data={"approval": "approve"},
                allow_redirects=True,
                verify=not self._skip_tls_verify,
            )
            if resp.status_code != 200:
                raise RuntimeError(
                    f"HTTP status code '{resp.status_code}' for POST against: {url_obj.geturl()}"
                )

        return "; ".join([f"{c.name}={c.value}" for c in s.cookies])

    def _create_kfp_client(self) -> kfp.Client:
        try:
            session_cookies = self._get_session_cookies()
        except Exception as ex:
            raise RuntimeError(f"Failed to get Dex session cookies") from ex

        # monkey patch the kfp.Client to support disabling SSL verification
        # kfp only added support in v2: https://github.com/kubeflow/pipelines/pull/7174
        original_load_config = kfp.Client._load_config

        def patched_load_config(client_self, *args, **kwargs):
            config = original_load_config(client_self, *args, **kwargs)
            config.verify_ssl = not self._skip_tls_verify
            return config

        patched_kfp_client = kfp.Client
        patched_kfp_client._load_config = patched_load_config

        return patched_kfp_client(
            host=self._api_url,
            cookies=session_cookies,
        )

    def create_kfp_client(self) -> kfp.Client:
        """Get a newly authenticated Kubeflow Pipelines client."""
        return self._create_kfp_client()

HOST = 'https://hua-kubeflow.ddns.net/pipeline'
USERNAME = "gfatouros@innov-acts.com"
PASSWORD = "innov@123"
NAMESPACE = "innov"

kfp_client_manager = KFPClientManager(
    api_url=HOST,
    skip_tls_verify=True,
    dex_username=USERNAME,
    dex_password=PASSWORD,
    # can be 'ldap' or 'local' depending on your Dex configuration
    dex_auth_type="local",
)

# get a newly authenticated KFP client
# TIP: long-lived sessions might need to get a new client when their session expires
kfp_client = kfp_client_manager.create_kfp_client()

# test the client by listing experiments
experiments = kfp_client.list_experiments(namespace=NAMESPACE)
print(experiments)

RuntimeError: Failed to get Dex session cookies

In [8]:
experiments = client.list_experiments()
print(experiments)

ApiException: (401)
Reason: Unauthorized
HTTP response headers: HTTPHeaderDict({'content-type': 'application/json', 'content-length': '2', 'date': 'Tue, 15 Apr 2025 09:25:22 GMT', 'server': 'istio-envoy'})
HTTP response body: {}


In [7]:
pipelines = client.list_pipelines()
print(pipelines)

ApiException: (401)
Reason: Unauthorized
HTTP response headers: HTTPHeaderDict({'content-type': 'application/json', 'content-length': '2', 'date': 'Tue, 15 Apr 2025 09:25:16 GMT', 'server': 'istio-envoy'})
HTTP response body: {}


In [4]:
#from official docs: https://www.kubeflow.org/docs/components/pipelines/user-guides/core-functions/connect-api/
import re
from urllib.parse import urlsplit, urlencode

import kfp
import requests
import urllib3


class KFPClientManager:
    """
    A class that creates `kfp.Client` instances with Dex authentication.
    """

    def __init__(
        self,
        api_url: str,
        dex_username: str,
        dex_password: str,
        dex_auth_type: str = "local",
        skip_tls_verify: bool = False,
    ):
        """
        Initialize the KfpClient

        :param api_url: the Kubeflow Pipelines API URL
        :param skip_tls_verify: if True, skip TLS verification
        :param dex_username: the Dex username
        :param dex_password: the Dex password
        :param dex_auth_type: the auth type to use if Dex has multiple enabled, one of: ['ldap', 'local']
        """
        self._api_url = api_url
        self._skip_tls_verify = skip_tls_verify
        self._dex_username = dex_username
        self._dex_password = dex_password
        self._dex_auth_type = dex_auth_type
        self._client = None

        # disable SSL verification, if requested
        if self._skip_tls_verify:
            urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

        # ensure `dex_default_auth_type` is valid
        if self._dex_auth_type not in ["ldap", "local"]:
            raise ValueError(
                f"Invalid `dex_auth_type` '{self._dex_auth_type}', must be one of: ['ldap', 'local']"
            )

    def _get_session_cookies(self) -> str:
        """
        Get the session cookies by authenticating against Dex
        :return: a string of session cookies in the form "key1=value1; key2=value2"
        """

        # use a persistent session (for cookies)
        s = requests.Session()

        # GET the api_url, which should redirect to Dex
        resp = s.get(
            self._api_url, allow_redirects=True, verify=not self._skip_tls_verify
        )
        if resp.status_code == 200:
            pass
        elif resp.status_code == 403:
            # if we get 403, we might be at the oauth2-proxy sign-in page
            # the default path to start the sign-in flow is `/oauth2/start?rd=<url>`
            url_obj = urlsplit(resp.url)
            url_obj = url_obj._replace(
                path="/oauth2/start", query=urlencode({"rd": url_obj.path})
            )
            resp = s.get(
                url_obj.geturl(), allow_redirects=True, verify=not self._skip_tls_verify
            )
        else:
            raise RuntimeError(
                f"HTTP status code '{resp.status_code}' for GET against: {self._api_url}"
            )

        # if we were NOT redirected, then the endpoint is unsecured
        if len(resp.history) == 0:
            # no cookies are needed
            return ""

        # if we are at `../auth` path, we need to select an auth type
        url_obj = urlsplit(resp.url)
        if re.search(r"/auth$", url_obj.path):
            url_obj = url_obj._replace(
                path=re.sub(r"/auth$", f"/auth/{self._dex_auth_type}", url_obj.path)
            )

        # if we are at `../auth/xxxx/login` path, then we are at the login page
        if re.search(r"/auth/.*/login$", url_obj.path):
            dex_login_url = url_obj.geturl()
        else:
            # otherwise, we need to follow a redirect to the login page
            resp = s.get(
                url_obj.geturl(), allow_redirects=True, verify=not self._skip_tls_verify
            )
            if resp.status_code != 200:
                raise RuntimeError(
                    f"HTTP status code '{resp.status_code}' for GET against: {url_obj.geturl()}"
                )
            dex_login_url = resp.url

        # attempt Dex login
        resp = s.post(
            dex_login_url,
            data={"login": self._dex_username, "password": self._dex_password},
            allow_redirects=True,
            verify=not self._skip_tls_verify,
        )
        if resp.status_code != 200:
            raise RuntimeError(
                f"HTTP status code '{resp.status_code}' for POST against: {dex_login_url}"
            )

        # if we were NOT redirected, then the login credentials were probably invalid
        if len(resp.history) == 0:
            raise RuntimeError(
                f"Login credentials are probably invalid - "
                f"No redirect after POST to: {dex_login_url}"
            )

        # if we are at `../approval` path, we need to approve the login
        url_obj = urlsplit(resp.url)
        if re.search(r"/approval$", url_obj.path):
            dex_approval_url = url_obj.geturl()

            # approve the login
            resp = s.post(
                dex_approval_url,
                data={"approval": "approve"},
                allow_redirects=True,
                verify=not self._skip_tls_verify,
            )
            if resp.status_code != 200:
                raise RuntimeError(
                    f"HTTP status code '{resp.status_code}' for POST against: {url_obj.geturl()}"
                )

        return "; ".join([f"{c.name}={c.value}" for c in s.cookies])

    def _create_kfp_client(self) -> kfp.Client:
        try:
            session_cookies = self._get_session_cookies()
        except Exception as ex:
            raise RuntimeError(f"Failed to get Dex session cookies") from ex

        # monkey patch the kfp.Client to support disabling SSL verification
        # kfp only added support in v2: https://github.com/kubeflow/pipelines/pull/7174
        original_load_config = kfp.Client._load_config

        def patched_load_config(client_self, *args, **kwargs):
            config = original_load_config(client_self, *args, **kwargs)
            config.verify_ssl = not self._skip_tls_verify
            return config

        patched_kfp_client = kfp.Client
        patched_kfp_client._load_config = patched_load_config

        return patched_kfp_client(
            host=self._api_url,
            cookies=session_cookies,
        )

    def create_kfp_client(self) -> kfp.Client:
        """Get a newly authenticated Kubeflow Pipelines client."""
        return self._create_kfp_client()

HOST = 'http://huanew-kubeflow.ddns.net/pipeline'
USERNAME = "gfatouros@innov-acts.com"
PASSWORD = "innov@123"
NAMESPACE = "innov"

kfp_client_manager = KFPClientManager(
    api_url=HOST,
    skip_tls_verify=True,
    dex_username=USERNAME,
    dex_password=PASSWORD,
    # can be 'ldap' or 'local' depending on your Dex configuration
    dex_auth_type="local",
)

# get a newly authenticated KFP client
# TIP: long-lived sessions might need to get a new client when their session expires
kfp_client = kfp_client_manager.create_kfp_client()

# test the client by listing experiments
experiments = kfp_client.list_experiments(namespace=NAMESPACE)
print(experiments)



{'experiments': None, 'next_page_token': None, 'total_size': None}


In [6]:
run

{'created_at': datetime.datetime(2025, 4, 11, 22, 29, 8, tzinfo=tzutc()),
 'description': None,
 'display_name': 'diabetes-classification-pipeline-rerun-2025-04-12',
 'error': None,
 'experiment_id': 'f9925fee-9c05-4125-a737-4d608200e59b',
 'finished_at': datetime.datetime(2025, 4, 11, 22, 30, 49, tzinfo=tzutc()),
 'pipeline_spec': None,
 'pipeline_version_id': None,
 'pipeline_version_reference': {'pipeline_id': '58d4509f-970b-4979-9706-15c2e9dacc19',
                                'pipeline_version_id': '29d72e09-aa5c-4da3-8d8a-80eef26d5b5a'},
 'recurring_run_id': None,
 'run_details': {'pipeline_context_id': None,
                 'pipeline_run_context_id': None,
                 'task_details': [{'child_tasks': [{'pod_name': 'diabetes-classification-pipeline-x2c22-2880010339',
                                                    'task_id': None}],
                                   'create_time': datetime.datetime(2025, 4, 11, 22, 29, 8, tzinfo=tzutc()),
                           

In [7]:
def get_metrics_v2(client, run_id: str):
    """Get metrics from a KFP v2 pipeline run"""
    # Get run details
    run = client.get_run(run_id)
    
    # Get the artifacts
    artifacts = client.list_run_artifacts(run_id).artifacts
    
    # Filter for metrics artifacts
    metrics_artifacts = [a for a in artifacts if a.type.name == "system.Metrics"]
    
    metrics_by_component = {}
    
    for artifact in metrics_artifacts:
        component_name = artifact.producer_component
        artifact_uri = artifact.uri
        
        # For GCS-based artifacts
        if artifact_uri.startswith("gs://"):
            metrics_content = client_utils.get_gcs_file(artifact_uri)
            metrics_by_component[component_name] = json.loads(metrics_content)
        # For Minio/S3 artifacts
        elif artifact_uri.startswith("minio://") or artifact_uri.startswith("s3://"):
            # You'll need appropriate S3 client code here
            pass
        # For other artifact storage systems
        else:
            # Try to read directly from the API if possible
            try:
                metrics_data = client._get_api_client().runs_service_api.read_artifact(
                    artifact.name
                ).data
                metrics_by_component[component_name] = json.loads(metrics_data)
            except Exception as e:
                metrics_by_component[component_name] = {"error": str(e)}
    
    return metrics_by_component

In [8]:
artifacts = client.list_run_artifacts('d30417fe-d5b3-4ea3-8b95-9d7cf448a500').artifacts

AttributeError: 'Client' object has no attribute 'list_run_artifacts'

In [None]:
get_metrics_v2(client, "d30417fe-d5b3-4ea3-8b95-9d7cf448a500")

In [16]:
get_kf_pipelines('diabetes-classification-pipeline', None, 10, '', 'created_at desc')



{'total_pipelines': 1,
 'next_page_token': None,
 'namespace_type': 'shared',
 'namespace': 'shared',
 'pipelines': [{'id': '58d4509f-970b-4979-9706-15c2e9dacc19',
   'name': 'diabetes-classification-pipeline',
   'description': 'A demonstration pipeline for diabetes classification using multiple models with artifact tracking ',
   'created_at': '2025-04-11 15:12:48+00:00'}]}

In [27]:
client.list_pipeline_versions('58d4509f-970b-4979-9706-15c2e9dacc19')

{'next_page_token': None,
 'pipeline_versions': [{'code_source_url': None,
                        'created_at': datetime.datetime(2025, 4, 11, 15, 12, 48, tzinfo=tzutc()),
                        'description': 'A demonstration pipeline for diabetes '
                                       'classification using multiple models '
                                       'with artifact tracking ',
                        'display_name': 'diabetes-classification-pipeline',
                        'error': None,
                        'package_url': None,
                        'pipeline_id': '58d4509f-970b-4979-9706-15c2e9dacc19',
                        'pipeline_spec': {'components': {'comp-compare-models': {'executorLabel': 'exec-compare-models',
                                                                                 'inputDefinitions': {'artifacts': {'dt_metrics': {'artifactType': {'schemaTitle': 'system.Metrics',
                                                             

In [3]:
HOST = 'http://huanew-kubeflow.ddns.net/pipeline'
USERNAME = "gfatouros@innov-acts.com"
PASSWORD = "Gxpnn8l4"
NAMESPACE = "innovacts"

kfp_client_manager = KFPClientManager(
    api_url=HOST,
    skip_tls_verify=True,

    dex_username=USERNAME,
    dex_password=PASSWORD,

    # can be 'ldap' or 'local' depending on your Dex configuration
    dex_auth_type="ldap",
)

# get a newly authenticated KFP client
# TIP: long-lived sessions might need to get a new client when their session expires
kfp_client = kfp_client_manager.create_kfp_client()

# test the client by listing experiments
experiments = kfp_client.list_experiments(namespace=NAMESPACE)
print(experiments)

{'experiments': [{'created_at': datetime.datetime(2025, 3, 6, 14, 23, 53, tzinfo=tzutc()),
                  'description': 'Test-simple-ccn',
                  'display_name': 'med-minst-exp',
                  'experiment_id': '0d73987e-3e9e-4cd9-b6ad-66a29c1de208',
                  'namespace': 'innovacts',
                  'storage_state': 'AVAILABLE'},
                 {'created_at': datetime.datetime(2025, 3, 29, 0, 21, 36, tzinfo=tzutc()),
                  'description': None,
                  'display_name': 'Default',
                  'experiment_id': 'b6fa3bc7-e8af-4ea3-acf4-1ab328fa8f0e',
                  'namespace': 'innovacts',
                  'storage_state': 'AVAILABLE'}],
 'next_page_token': None,
 'total_size': 2}




In [2]:
# Fix MinIO connection settings
MINIO_ACCESS_KEY="JTnvpo8gGkioE6QhJAq7"
MINIO_SECRET_KEY="Ei281BuHUHihXf93PNhAUdIF8QV1Cm2mVhUbCPqn"
MINIO_SECURE=True
MINIO_ENDPOINT="minio.humaine-horizon.eu:9000"  # Remove the path component
MINIO_PATH="/api/v1/"  # Store the path separately if needed

from minio import Minio
from minio.error import S3Error

def get_minio_client() -> Minio:
    """Initialize and return a MinIO client."""
    client = Minio(
        endpoint=MINIO_ENDPOINT,  # Use just the domain part
        access_key=MINIO_ACCESS_KEY,
        secret_key=MINIO_SECRET_KEY,
        secure=MINIO_SECURE
    )
    
    # If you need to handle the path component, you might need to
    # set it in the client configuration or handle it when making requests
    return client



In [3]:
client = get_minio_client()
        
      