In [11]:
import re
from urllib.parse import urlsplit

import kfp
import requests
import urllib3


class KFPClientManager:
    """
    A class that creates `kfp.Client` instances with Dex authentication.
    """

    def __init__(
        self,
        api_url: str,
        dex_username: str,
        dex_password: str,
        dex_auth_type: str = "local",
        skip_tls_verify: bool = False,
    ):
        """
        Initialize the KfpClient

        :param api_url: the Kubeflow Pipelines API URL
        :param skip_tls_verify: if True, skip TLS verification
        :param dex_username: the Dex username
        :param dex_password: the Dex password
        :param dex_auth_type: the auth type to use if Dex has multiple enabled, one of: ['ldap', 'local']
        """
        self._api_url = api_url
        self._skip_tls_verify = skip_tls_verify
        self._dex_username = dex_username
        self._dex_password = dex_password
        self._dex_auth_type = dex_auth_type
        self._client = None

        # ensure `dex_default_auth_type` is valid
        if self._dex_auth_type not in ["ldap", "local"]:
            raise ValueError(
                f"Invalid `dex_auth_type` '{self._dex_auth_type}', must be one of: ['ldap', 'local']"
            )

    def _get_session_cookies(self) -> str:
        """
        Get the session cookies by authenticating against Dex
        :return: a string of session cookies in the form "key1=value1; key2=value2"
        """

        # use a persistent session (for cookies)
        s = requests.Session()

        # disable SSL verification, if requested
        if self._skip_tls_verify:
            s.verify = False
            urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

        # GET the api_url, which should redirect to Dex
        resp = s.get(self._api_url, allow_redirects=True)
        if resp.status_code != 200:
            raise RuntimeError(
                f"HTTP status code '{resp.status_code}' for GET against: {self._api_url}"
            )

        # if we were NOT redirected, then the endpoint is unsecured
        if len(resp.history) == 0:
            # no cookies are needed
            return ""

        # if we are at `/auth?=xxxx` path, we need to select an auth type
        url_obj = urlsplit(resp.url)
        if re.search(r"/auth$", url_obj.path):
            url_obj = url_obj._replace(
                path=re.sub(r"/auth$", f"/auth/{self._dex_auth_type}", url_obj.path)
            )

        # if we are at `/auth/xxxx/login` path, then we are at the login page
        if re.search(r"/auth/.*/login$", url_obj.path):
            dex_login_url = url_obj.geturl()
        else:
            # otherwise, we need to follow a redirect to the login page
            resp = s.get(url_obj.geturl(), allow_redirects=True)
            if resp.status_code != 200:
                raise RuntimeError(
                    f"HTTP status code '{resp.status_code}' for GET against: {url_obj.geturl()}"
                )
            dex_login_url = resp.url

        # attempt Dex login
        resp = s.post(
            dex_login_url,
            data={"login": self._dex_username, "password": self._dex_password},
            allow_redirects=True,
        )
        if resp.status_code != 200:
            raise RuntimeError(
                f"HTTP status code '{resp.status_code}' for POST against: {dex_login_url}"
            )

        # if we were NOT redirected, then the login credentials were probably invalid
        if len(resp.history) == 0:
            raise RuntimeError(
                f"Login credentials are probably invalid - "
                f"No redirect after POST to: {dex_login_url}"
            )

        return "; ".join([f"{c.name}={c.value}" for c in s.cookies])

    def _create_kfp_client(self) -> kfp.Client:
        try:
            session_cookies = self._get_session_cookies()
        except Exception as ex:
            raise RuntimeError(f"Failed to get Dex session cookies") from ex

        # monkey patch the kfp.Client to support disabling SSL verification
        # kfp only added support in v2: https://github.com/kubeflow/pipelines/pull/7174
        original_load_config = kfp.Client._load_config

        def patched_load_config(client_self, *args, **kwargs):
            config = original_load_config(client_self, *args, **kwargs)
            config.verify_ssl = not self._skip_tls_verify
            return config

        patched_kfp_client = kfp.Client
        patched_kfp_client._load_config = patched_load_config

        return patched_kfp_client(
            host=self._api_url,
            cookies=session_cookies,
        )

    def create_kfp_client(self) -> kfp.Client:
        """Get a newly authenticated Kubeflow Pipelines client."""
        return self._create_kfp_client()


In [2]:
pip install protobuf==3.19.* --user

Collecting protobuf==3.19.*
  Downloading protobuf-3.19.6-cp310-cp310-win_amd64.whl (895 kB)
     ---------------------------------------- 0.0/895.7 kB ? eta -:--:--
     ---- ---------------------------------- 92.2/895.7 kB 2.6 MB/s eta 0:00:01
     ----- -------------------------------- 122.9/895.7 kB 1.4 MB/s eta 0:00:01
     -------- ----------------------------- 194.6/895.7 kB 1.7 MB/s eta 0:00:01
     ---------------- --------------------- 399.4/895.7 kB 2.3 MB/s eta 0:00:01
     ----------------------------- -------- 686.1/895.7 kB 3.1 MB/s eta 0:00:01
     -------------------------------------  890.9/895.7 kB 3.5 MB/s eta 0:00:01
     -------------------------------------- 895.7/895.7 kB 3.0 MB/s eta 0:00:00
Installing collected packages: protobuf
  Attempting uninstall: protobuf
    Found existing installation: protobuf 4.24.4
    Uninstalling protobuf-4.24.4:
      Successfully uninstalled protobuf-4.24.4
Successfully installed protobuf-3.19.6
Note: you may need to restart th

  You can safely remove it manually.
ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
feast 0.31.1 requires protobuf<5,>3.20, but you have protobuf 3.19.6 which is incompatible.
grpcio-reflection 1.56.0 requires protobuf>=4.21.6, but you have protobuf 3.19.6 which is incompatible.
tensorflow-intel 2.13.0 requires protobuf!=4.21.0,!=4.21.1,!=4.21.2,!=4.21.3,!=4.21.4,!=4.21.5,<5.0.0dev,>=3.20.3, but you have protobuf 3.19.6 which is incompatible.


In [2]:
# pip list

In [12]:
# initialize a KFPClientManager
kfp_client_manager = KFPClientManager(
    api_url="https://kbf-dev.maestro.maersk.com/pipeline",
    skip_tls_verify=True,

    dex_username="sagar.kant@maersk.com",
    dex_password="contributor@124",

    dex_auth_type="local",
)

# get a newly authenticated KFP client
# TIP: long-lived sessions might need to get a new client when their session expires
kfp_client = kfp_client_manager.create_kfp_client()

# test the client by listing experiments
experiments = kfp_client.list_experiments(namespace="sagar-kant")
print(experiments)



{'experiments': [{'created_at': datetime.datetime(2023, 10, 6, 7, 15, 58, tzinfo=tzutc()),
                  'description': None,
                  'display_name': 'Default',
                  'experiment_id': 'ab01fc28-681a-410a-a84a-ea4665bcad0b',
                  'namespace': 'sagar-kant',
                  'storage_state': 'AVAILABLE'},
                 {'created_at': datetime.datetime(2023, 10, 17, 6, 31, 38, tzinfo=tzutc()),
                  'description': None,
                  'display_name': 'xg_boost',
                  'experiment_id': '2bf812e7-836e-49a0-95d5-067d8e33958b',
                  'namespace': 'sagar-kant',
                  'storage_state': 'AVAILABLE'},
                 {'created_at': datetime.datetime(2023, 10, 17, 6, 31, 53, tzinfo=tzutc()),
                  'description': None,
                  'display_name': 'untitled',
                  'experiment_id': '52b96571-5b21-4d2b-8fae-f6f0a0fb1317',
                  'namespace': 'sagar-kant',
             

In [3]:
# import kfp
# from kfp import dsl

# def hello_op():
#     return dsl.ContainerOp(
#         name='Hello Kubeflow',
#         image='python:3.7',
#         command=['python', '-c'],
#         arguments=['print("Hello Kubeflow!")'],
#     )

# @dsl.pipeline(
#    name='Hello Kubeflow Pipeline',
#    description='A simple intro pipeline'
# )
# def hello_pipeline():
#     hello = hello_op()

In [7]:
pipeline_func = hello_pipeline
pipeline_filename = pipeline_func.__name__ + '.pipeline.tar.gz'
import kfp.compiler as compiler
compiler.Compiler().compile(pipeline_func, pipeline_filename)




In [13]:
experimentSagar = kfp_client.create_experiment('my-experiment', namespace='sagar-kant')

In [4]:
# import kfp
# # Assuming you've already set up the KFPClientManager as before
# # kfp_client = kfp_client_manager.create_kfp_client()

# # Upload the pipeline
# pipeline_package_path = './hello_pipeline.pipeline.tar.gz'
# pipeline_name = "Hello Kubeflow Pipeline"
# pipeline = kfp_client.upload_pipeline(pipeline_package_path, pipeline_name)

# # Run the pipeline
# experiment_name = 'Hello Kubeflow Experiment'
# run_name = pipeline_name + ' Run'
# experiment = kfp_client.create_experiment(experiment_name)
# run_result = kfp_client.run_pipeline(experiment.id, run_name, pipeline_package_path)


In [3]:
kfp_client.list_pipelines()

{'next_page_token': 'eyJTb3J0QnlGaWVsZE5hbWUiOiJDcmVhdGVkQXRJblNlYyIsIlNvcnRCeUZpZWxkVmFsdWUiOjE2OTkwMDI5ODYsIlNvcnRCeUZpZWxkUHJlZml4IjoicGlwZWxpbmVzLiIsIktleUZpZWxkTmFtZSI6IlVVSUQiLCJLZXlGaWVsZFZhbHVlIjoiMDc1NjMzNmItMjgzYi00NGY5LWJhNmEtNjNhYTg2OTY2ZDQwIiwiS2V5RmllbGRQcmVmaXgiOiJwaXBlbGluZXMuIiwiSXNEZXNjIjpmYWxzZSwiTW9kZWxOYW1lIjoicGlwZWxpbmVzIiwiRmlsdGVyIjpudWxsfQ==',
 'pipelines': [{'created_at': datetime.datetime(2023, 10, 5, 11, 45, 32, tzinfo=tzutc()),
                'description': '[source '
                               'code](https://github.com/kubeflow/pipelines/tree/63ca91850a9f42a357f3417110a3011ddbf43290/samples/tutorials/Data%20passing%20in%20python%20components) '
                               'Shows how to pass data between python '
                               'components.',
                'display_name': '[Tutorial] Data passing in python components',
                'error': None,
                'namespace': None,
                'pipeline_id': '14c583d2-e634-4

In [14]:
experiment = kfp_client.create_experiment('Demand-Forecast-Experimentation', namespace='sagar-kant')

In [15]:
experiment.experiment_id

'2463fe4d-e5fc-4d77-ac27-c8852cee5fc5'

In [6]:
# kfp_client.list_runs()

In [10]:
# Submit a pipeline run from the pipeline package 'pipeline.yaml' to the experiment 'my-experiment'
kfp_client.create_run_from_pipeline_package('C:/Users/SKA641/demo_DF_3.yaml', experiment_id=experiment.experiment_id)

# Submit a pipeline run from the pipeline function 'pipeline_func' to the experiment 'my-experiment'
# kfp_client.create_run_from_pipeline_func(pipeline_func, experiment_id=experiment.id)

RunPipelineResult(run_id=5f2fbd84-6388-4cdb-bbc6-d1e722ccaff7)

In [44]:
from typing import List

from kfp import client
from kfp import dsl
from kfp.dsl import Dataset
from kfp.dsl import Input
# https://github.com/KantSagar/SampleData/blob/ff59cdd8c3c3f7cc3ecad61ef1cb3be32a19b2db/train.csv
from kfp.dsl import Model
from kfp.dsl import Output

# 'numpy==1.3.0','pytz==2023.2'
@dsl.component(packages_to_install=['pandas==1.3.5','numpy==1.21.6','pytz==2022.1','python-dateutil==2.8.2','six==1.16.0'])
def create_dataset(iris_dataset: Output[Dataset]):
    import pandas as pd

    # csv_url = 'https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data'
    # csv_url='https://github.com/KantSagar/SampleData/blob/63d7d1fcb344b5aab4eadd14ec2095fad334bb87/iris.csv'
    csv_url='https://github.com/KantSagar/SampleData/blob/63d7d1fcb344b5aab4eadd14ec2095fad334bb87/iris.txt'
    col_names = [
        'Sepal_Length', 'Sepal_Width', 'Petal_Length', 'Petal_Width', 'Labels'
    ]
    df = pd.read_csv(csv_url, names=col_names)
    print(df.head(1))

    with open(iris_dataset.path, 'w') as f:
        df.to_csv(f)


@dsl.component(packages_to_install=['pandas==1.3.5', 'scikit-learn==1.0.2','numpy==1.21.6','pytz==2022.1','python-dateutil==2.8.2','six==1.16.0','scipy==1.7.3','joblib==1.1.0','threadpoolctl==3.1.0'])
def normalize_dataset(
    input_iris_dataset: Input[Dataset],
    normalized_iris_dataset: Output[Dataset],
    standard_scaler: bool,
    min_max_scaler: bool,
):
    if standard_scaler is min_max_scaler:
        raise ValueError(
            'Exactly one of standard_scaler or min_max_scaler must be True.')

    import pandas as pd
    from sklearn.preprocessing import MinMaxScaler
    from sklearn.preprocessing import StandardScaler

    with open(input_iris_dataset.path) as f:
        df = pd.read_csv(f)
    labels = df.pop('Labels')

    # if standard_scaler:
    #     # scaler = StandardScaler()
    # if min_max_scaler:
    #     # scaler = MinMaxScaler()

    # df = pd.DataFrame(scaler.fit_transform(df))
    df['Labels'] = labels
    with open(normalized_iris_dataset.path, 'w') as f:
        df.to_csv(f)


@dsl.component(packages_to_install=['pandas==1.3.5', 'scikit-learn==1.0.2','numpy==1.21.6','pytz==2022.1','python-dateutil==2.8.2','six==1.16.0','scipy==1.7.3','joblib==1.1.0','threadpoolctl==3.1.0'])
def train_model(
    normalized_iris_dataset: Input[Dataset],
    model: Output[Model],
    n_neighbors: int,
):
    import pickle

    import pandas as pd
    from sklearn.model_selection import train_test_split
    from sklearn.neighbors import KNeighborsClassifier

    with open(normalized_iris_dataset.path) as f:
        df = pd.read_csv(f)

    y = df.pop('Labels')
    X = df

    X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0)

    clf = KNeighborsClassifier(n_neighbors=n_neighbors)
    clf.fit(X_train, y_train)
    with open(model.path, 'wb') as f:
        pickle.dump(clf, f)


@dsl.pipeline(name='iris-training-pipeline')
def my_pipeline(
    standard_scaler: bool,
    min_max_scaler: bool,
    neighbors: int
):
    
    create_dataset_task = create_dataset()

    normalize_dataset_task = normalize_dataset(
        input_iris_dataset=create_dataset_task.outputs['iris_dataset'],
        standard_scaler=True,
        min_max_scaler=False)
    
    train_task=train_model(
        normalized_iris_dataset=normalize_dataset_task.outputs['normalized_iris_dataset'],
        n_neighbors=neighbors)

    # # with dsl.ParallelFor(neighbors) as n_neighbors:
    # train_task =train_model(
    #         normalized_iris_dataset=normalize_dataset_task
    #         .outputs['normalized_iris_dataset'],
    #         n_neighbors=neighbors)


# endpoint = 'http://localhost:8080/'
# kfp_client = client.Client(host=endpoint)
run = kfp_client.create_run_from_pipeline_func(
    my_pipeline,
    arguments={
        'min_max_scaler': True,
        'standard_scaler': False,
        'neighbors': 3
        # 'neighbors': [3, 6, 9]
    },
    namespace="sagar-kant",
    experiment_id=experiment.experiment_id
)
# url = f'{endpoint}/#/runs/details/{run.run_id}'
# print(url)

In [7]:
from typing import List

from kfp import client
from kfp import dsl
from kfp.dsl import Dataset
from kfp.dsl import Input
from kfp.dsl import Model
from kfp.dsl import Output

@dsl.component(packages_to_install=['pandas==1.3.5','numpy==1.21.6','pytz==2022.1','python-dateutil==2.8.2','six==1.16.0'])
# def create_dataset(iris_dataset: Output[Dataset]):
def create_dataset(train_dataset: Output[Dataset],test_dataset: Output[Dataset],fullfil_center_dataset: Output[Dataset],meal_info_dataset: Output[Dataset],):
    import pandas as pd
    csv_url_train='https://raw.githubusercontent.com/KantSagar/SampleData/master/train.csv'
    # csv_url_test='https://raw.githubusercontent.com/KantSagar/SampleData/master/test.csv'
    # csv_url_fullfil_center='https://raw.githubusercontent.com/KantSagar/SampleData/master/fulfilment_center_info.csv'
    # csv_url_meal_info='https://raw.githubusercontent.com/KantSagar/SampleData/master/meal_info.csv'
    # csv_url_train='https://github.com/KantSagar/SampleData/blob/b04b3a61bf49b8f4d4597d314a500fce10ce254c/train.txt'
    # csv_url_test='https://github.com/KantSagar/SampleData/blob/ad1b0e599c8c45533949880d063315fac49f83f1/test.txt'
    # csv_url_fullfil_center='https://github.com/KantSagar/SampleData/blob/c5923bd3eb5d5cff9dea7c89300eaf34f8707cf2/fulfilment_center_info.txt'
    # csv_url_meal_info='https://github.com/KantSagar/SampleData/blob/01b9af5470f66507af8a62aa408ca052cefeb481/meal_info.txt'
    # col_names_train=['id','week','center_id','meal_id','checkout_price','base_price','emailer_for_promotion','homepage_featured','num_orders']
    # df_train = pd.read_csv(csv_url_train,names=col_names_train)
    # col_names_test=['id','week','center_id','meal_id','checkout_price','base_price','emailer_for_promotion','homepage_featured']
    # df_test = pd.read_csv(csv_url_test,names=col_names_test)
    # col_names_fci=['center_id','city_code','region_code','center_type','op_area']
    # df_fullfil_center = pd.read_csv(csv_url_fullfil_center,names=col_names_fci)
    # col_names_mi=['meal_id','category','cuisine']
    # df_meal_info = pd.read_csv(csv_url_meal_info,names=col_names_mi)
    # csv_url = 'https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data'
    # csv_url='https://github.com/KantSagar/SampleData/blob/63d7d1fcb344b5aab4eadd14ec2095fad334bb87/iris.csv'
    # csv_url='https://github.com/KantSagar/SampleData/blob/63d7d1fcb344b5aab4eadd14ec2095fad334bb87/iris.txt'
    # col_names_train = [
    #     'id', 'week', 'center_id', 'meal_id', 'checkout_price','base_price','emailer_for_promotion','homepage_featured',
    # ]
    # col_names_test = [
    #     'Sepal_Length', 'Sepal_Width', 'Petal_Length', 'Petal_Width', 'Labels'
    # ]
    # col_names_fullfil_center = [
    #     'Sepal_Length', 'Sepal_Width', 'Petal_Length', 'Petal_Width', 'Labels'
    # ]
    # col_names_meal_info = [
    #     'Sepal_Length', 'Sepal_Width', 'Petal_Length', 'Petal_Width', 'Labels'
    # ]
    df_train = pd.read_csv(csv_url_train)
    # df_test = pd.read_csv(test)
    # df_fullfil_center = pd.read_csv(fullfil_center)
    # df_meal_info = pd.read_csv(meal_info)

    # with open(iris_dataset.path, 'w') as f:
    #     df_train.to_csv(f)
    # print(df_train.head(1))
    # data = pd.merge(df_train, df_fullfil_center, on='center_id')
    # all_data = pd.merge(data, df_meal_info, on='meal_id')
    # print(all_data.head(1))
    # with open(train_dataset.path, 'w') as f1:
    #     all_data.to_csv(f1)
    # with open(test_dataset.path, 'w') as f2:
    #     df_test.to_csv(f2)
    # with open(fullfil_center_dataset.path, 'w') as f3:
    #     df_fullfil_center.to_csv(f3)
    # with open(meal_info_dataset.path, 'w') as f4:
    #     df_meal_info.to_csv(f4)

@dsl.component(packages_to_install=['pandas==1.3.5', 'scikit-learn==1.0.2','numpy==1.21.6','pytz==2022.1','python-dateutil==2.8.2','six==1.16.0','scipy==1.7.3','joblib==1.1.0','threadpoolctl==3.1.0'])
def train_model(
    train_dataset: Input[Dataset],
    test_dataset: Input[Dataset],
    # fullfil_center_dataset: Input[Dataset],
    # meal_info_dataset: Input[Dataset],
    model: Output[Model],
    n_neighbors: int,
):
    import pickle

    import pandas as pd
    from sklearn.model_selection import train_test_split
    from sklearn.neighbors import KNeighborsClassifier
    from sklearn.model_selection import train_test_split
    from sklearn.linear_model import LinearRegression 
    from sklearn.preprocessing import LabelEncoder
    # from sklearn.tree import DecisionTreeRegressor
    # from sklearn.ensemble import RandomForestRegressor
    from sklearn.metrics import mean_squared_log_error
    from sklearn.metrics import mean_squared_error
    

    with open(train_dataset.path) as f5:
        train = pd.read_csv(f5)
    with open(test_dataset.path) as f6:
        test = pd.read_csv(f6)
    # with open(fullfil_center_dataset.path) as f7:
    #     fullfil_center = pd.read_csv(f7)
    # with open(meal_info_dataset.path) as f8:
    #     meal_info = pd.read_csv(f8)
    
    # print(train.head(1))
    # print(test.head(1))
    # print(fullfil_center_dataset.head(1))
    # print(meal_info.head(1))
    # data = pd.merge(train, fullfil_center, on='center_id')
    # print(data.head(1))
    # all_data = pd.merge(data, meal_info, on='meal_id')
    # data_cp = all_data.copy()
    data_cp=train
    lb_enc = LabelEncoder()
    data_cp["make_Cent_type"] = lb_enc.fit_transform(data_cp["center_type"])
    data_cp["make_category"] = lb_enc.fit_transform(data_cp["category"])
    data_cp["make_cuisine"] = lb_enc.fit_transform(data_cp["cuisine"])
    cp_data = data_cp.drop(['center_type','category','cuisine'], axis=1)
    X = cp_data.drop('num_orders', axis=1)
    y = cp_data['num_orders']
    print(X.head(1))
    print(y.head(1))
    X_train, X_test, y_train, y_test = train_test_split(X, y,test_size=0.2,train_size=0.8,random_state=0)
    lin_reg_model = LinearRegression()
    lin_reg_model.fit(X_train, y_train)
    predict_train = lin_reg_model.predict(X_train)
    rmse_train = mean_squared_error(y_train,predict_train)**(0.5)
        
    

    # y = df.pop('Labels')
    # X = df

    # X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0)

    # clf = KNeighborsClassifier(n_neighbors=n_neighbors)
    # clf.fit(X_train, y_train)
    with open(model.path, 'wb') as f:
        pickle.dump(lin_reg_model, f)
@dsl.pipeline(name='DF-training-pipeline')
def my_pipeline(
    neighbors: int
):
    
    
    create_dataset_task = create_dataset()

    # normalize_dataset_task = normalize_dataset(
    #     input_iris_dataset=create_dataset_task.outputs['iris_dataset'],
    #     standard_scaler=True,
    #     min_max_scaler=False)
    
    train_task=train_model(
        train_dataset=create_dataset_task.outputs['train_dataset'],
        test_dataset=create_dataset_task.outputs['test_dataset'],
        # fullfil_center_dataset=create_dataset_task.outputs['fullfil_center_dataset'],
        # meal_info_dataset=create_dataset_task.outputs['meal_info_dataset'],
        n_neighbors=neighbors)

    # # with dsl.ParallelFor(neighbors) as n_neighbors:
    # train_task =train_model(
    #         normalized_iris_dataset=normalize_dataset_task
    #         .outputs['normalized_iris_dataset'],
    #         n_neighbors=neighbors)


# endpoint = 'http://localhost:8080/'
# kfp_client = client.Client(host=endpoint)
run = kfp_client.create_run_from_pipeline_func(
    my_pipeline,
    arguments={
        'neighbors': 3
        # 'neighbors': [3, 6, 9]
    },
    namespace="sagar-kant",
    experiment_id=experiment.experiment_id
)



NameError: name 'experiment' is not defined

In [16]:
from typing import List

from kfp import client
from kfp import dsl
from kfp.dsl import Dataset
from kfp.dsl import Input
from kfp.dsl import Model
from kfp.dsl import Output

@dsl.component(packages_to_install=['pandas==1.3.5','numpy==1.21.6','pytz==2022.1','python-dateutil==2.8.2','six==1.16.0'])
# def create_dataset(iris_dataset: Output[Dataset]):
def create_dataset(train_dataset: Output[Dataset],test_dataset: Output[Dataset],fullfil_center_dataset: Output[Dataset],meal_info_dataset: Output[Dataset],):
    import pandas as pd
    csv_url_train='https://raw.githubusercontent.com/KantSagar/SampleData/master/train.csv'
    csv_url_test='https://raw.githubusercontent.com/KantSagar/SampleData/master/test.csv'
    csv_url_fullfil_center='https://raw.githubusercontent.com/KantSagar/SampleData/master/fulfilment_center_info.csv'
    csv_url_meal_info='https://raw.githubusercontent.com/KantSagar/SampleData/master/meal_info.csv'
    # csv_url_train='https://github.com/KantSagar/SampleData/blob/b04b3a61bf49b8f4d4597d314a500fce10ce254c/train.txt'
    # csv_url_test='https://github.com/KantSagar/SampleData/blob/ad1b0e599c8c45533949880d063315fac49f83f1/test.txt'
    # csv_url_fullfil_center='https://github.com/KantSagar/SampleData/blob/c5923bd3eb5d5cff9dea7c89300eaf34f8707cf2/fulfilment_center_info.txt'
    # csv_url_meal_info='https://github.com/KantSagar/SampleData/blob/01b9af5470f66507af8a62aa408ca052cefeb481/meal_info.txt'
    # col_names_train=['id','week','center_id','meal_id','checkout_price','base_price','emailer_for_promotion','homepage_featured','num_orders']
    # df_train = pd.read_csv(csv_url_train,names=col_names_train)
    # col_names_test=['id','week','center_id','meal_id','checkout_price','base_price','emailer_for_promotion','homepage_featured']
    # df_test = pd.read_csv(csv_url_test,names=col_names_test)
    # col_names_fci=['center_id','city_code','region_code','center_type','op_area']
    # df_fullfil_center = pd.read_csv(csv_url_fullfil_center,names=col_names_fci)
    # col_names_mi=['meal_id','category','cuisine']
    # df_meal_info = pd.read_csv(csv_url_meal_info,names=col_names_mi)
    # csv_url = 'https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data'
    # csv_url='https://github.com/KantSagar/SampleData/blob/63d7d1fcb344b5aab4eadd14ec2095fad334bb87/iris.csv'
    # csv_url='https://github.com/KantSagar/SampleData/blob/63d7d1fcb344b5aab4eadd14ec2095fad334bb87/iris.txt'
    # col_names_train = [
    #     'id', 'week', 'center_id', 'meal_id', 'checkout_price','base_price','emailer_for_promotion','homepage_featured',
    # ]
    # col_names_test = [
    #     'Sepal_Length', 'Sepal_Width', 'Petal_Length', 'Petal_Width', 'Labels'
    # ]
    # col_names_fullfil_center = [
    #     'Sepal_Length', 'Sepal_Width', 'Petal_Length', 'Petal_Width', 'Labels'
    # ]
    # col_names_meal_info = [
    #     'Sepal_Length', 'Sepal_Width', 'Petal_Length', 'Petal_Width', 'Labels'
    # ]
    df_train = pd.read_csv(csv_url_train)
    df_test = pd.read_csv(csv_url_test)
    df_fullfil_center = pd.read_csv(csv_url_fullfil_center)
    df_meal_info = pd.read_csv(csv_url_meal_info)

    # with open(iris_dataset.path, 'w') as f:
    #     df_train.to_csv(f)
    print(df_train.head(1))
    print(df_test.head(1))
    print(df_fullfil_center.head(1))
    print(df_meal_info.head(1))
    # data = pd.merge(df_train, df_fullfil_center, on='center_id')
    # all_data = pd.merge(data, df_meal_info, on='meal_id')
    # print(all_data.head(1))
    with open(train_dataset.path, 'w') as f1:
        df_train.to_csv(f1)
    with open(test_dataset.path, 'w') as f2:
        df_test.to_csv(f2)
    with open(fullfil_center_dataset.path, 'w') as f3:
        df_fullfil_center.to_csv(f3)
    with open(meal_info_dataset.path, 'w') as f4:
        df_meal_info.to_csv(f4)

@dsl.component(packages_to_install=['pandas==1.3.5', 'scikit-learn==1.0.2','numpy==1.21.6','pytz==2022.1','python-dateutil==2.8.2','six==1.16.0','scipy==1.7.3','joblib==1.1.0','threadpoolctl==3.1.0'])
def train_model(
    train_dataset: Input[Dataset],
    test_dataset: Input[Dataset],
    fullfil_center_dataset: Input[Dataset],
    meal_info_dataset: Input[Dataset],
    model: Output[Model],
    n_neighbors: int,
):
    import pickle

    import pandas as pd
    from sklearn.model_selection import train_test_split
    from sklearn.neighbors import KNeighborsClassifier
    from sklearn.model_selection import train_test_split
    from sklearn.linear_model import LinearRegression 
    from sklearn.preprocessing import LabelEncoder
    # from sklearn.tree import DecisionTreeRegressor
    # from sklearn.ensemble import RandomForestRegressor
    from sklearn.metrics import mean_squared_log_error
    from sklearn.metrics import mean_squared_error
    

    with open(train_dataset.path) as f5:
        train = pd.read_csv(f5)
    with open(test_dataset.path) as f6:
        test = pd.read_csv(f6)
    with open(fullfil_center_dataset.path) as f7:
        fullfil_center = pd.read_csv(f7)
    with open(meal_info_dataset.path) as f8:
        meal_info = pd.read_csv(f8)
    
    # print(train.head(1))
    # print(test.head(1))
    # print(fullfil_center_dataset.head(1))
    # print(meal_info.head(1))
    data = pd.merge(train, fullfil_center, on='center_id')
    print(data.head(1))
    all_data = pd.merge(data, meal_info, on='meal_id')
    data_cp = all_data.copy()
    # data_cp=train
    lb_enc = LabelEncoder()
    data_cp["make_Cent_type"] = lb_enc.fit_transform(data_cp["center_type"])
    data_cp["make_category"] = lb_enc.fit_transform(data_cp["category"])
    data_cp["make_cuisine"] = lb_enc.fit_transform(data_cp["cuisine"])
    cp_data = data_cp.drop(['center_type','category','cuisine'], axis=1)
    X = cp_data.drop('num_orders', axis=1)
    y = cp_data['num_orders']
    print(X.head(1))
    print(y.head(1))
    X_train, X_test, y_train, y_test = train_test_split(X, y,test_size=0.2,train_size=0.8,random_state=0)
    lin_reg_model = LinearRegression()
    lin_reg_model.fit(X_train, y_train)
    predict_train = lin_reg_model.predict(X_train)
    rmse_train = mean_squared_error(y_train,predict_train)**(0.5)
        
    

    # y = df.pop('Labels')
    # X = df

    # X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0)

    # clf = KNeighborsClassifier(n_neighbors=n_neighbors)
    # clf.fit(X_train, y_train)
    with open(model.path, 'wb') as f:
        pickle.dump(lin_reg_model, f)
@dsl.pipeline(name='Demand-Forcasting-MW')
def my_pipeline(
    neighbors: int
):
    
    
    create_dataset_task = create_dataset()

    # normalize_dataset_task = normalize_dataset(
    #     input_iris_dataset=create_dataset_task.outputs['iris_dataset'],
    #     standard_scaler=True,
    #     min_max_scaler=False)
    
    train_task=train_model(
        train_dataset=create_dataset_task.outputs['train_dataset'],
        test_dataset=create_dataset_task.outputs['test_dataset'],
        fullfil_center_dataset=create_dataset_task.outputs['fullfil_center_dataset'],
        meal_info_dataset=create_dataset_task.outputs['meal_info_dataset'],
        n_neighbors=neighbors)

    # # with dsl.ParallelFor(neighbors) as n_neighbors:
    # train_task =train_model(
    #         normalized_iris_dataset=normalize_dataset_task
    #         .outputs['normalized_iris_dataset'],
    #         n_neighbors=neighbors)


# endpoint = 'http://localhost:8080/'
# kfp_client = client.Client(host=endpoint)
run = kfp_client.create_run_from_pipeline_func(
    my_pipeline,
    arguments={
        'neighbors': 3
        # 'neighbors': [3, 6, 9]
    },
    namespace="sagar-kant",
    experiment_id=experiment.experiment_id 
)