In [None]:
import kfp
from kfp import dsl
from typing import NamedTuple
import pandas as pd
from kfp.components import InputPath, InputTextFile, OutputPath, OutputTextFile

In [None]:
def download_data_func(id_data: str, id_label: str, id_pattern: str, path_data: OutputPath(str), path_label: OutputPath(str), path_pattern: OutputPath(str)):
    import gdown
    
    url = f"https://drive.google.com/uc?id={id_data}"
    print("url data:", url)
    gdown.download(url, path_data, quiet=False)
    print(">> Download data success")
    
    url = f"https://drive.google.com/uc?id={id_label}"
    print("url label:", url)
    gdown.download(url, path_label, quiet=False)
    print(">> Download label success")
    
    url = f"https://drive.google.com/uc?id={id_pattern}"
    print("url pattern:", url)
    gdown.download(url, path_pattern, quiet=False)
    print(">> Download pattern success")
    
    import os
    import pathlib
    print("current directory:", pathlib.Path().resolve())
    print("os.listdir:", os.listdir(pathlib.Path().resolve()))

In [None]:
def process_data_func(path_data: InputPath(str), path_label: InputPath(str), path_pattern: InputPath(str), path_vip_data: OutputPath(str), path_id2label: OutputPath(str)):
    
    import pandas as pd 
    import re
    from tqdm import tqdm
    import json
    
    pattern = open(path_pattern).read()
    print("pattern", pattern)
    
    def text_preprocess(text: str) -> str:
        text = text.lower()
        text = re.sub(pattern, ' ', text).strip().split()
        text = ' '.join(text)
        return text
    
    import os
    import pathlib
    print("current directory:", pathlib.Path().resolve())
    print("os.listdir:", os.listdir(pathlib.Path().resolve()))
    
    df_label = pd.read_csv(path_label)
    print(">> Read input_path_label success!!!")
    
    dict_label = {}

    for index, row in df_label.iterrows(): 
            name = row['cate_name'].lower()
            name = re.sub(pattern, ' ', name).strip().replace('  ', ',')
            dict_label[row['cate_id']] = name
            
    df_data = pd.read_csv(path_data, sep = "	", names=["id", "cate", "name", "content"])
    df_data = df_data.dropna(subset=['cate', 'name', 'content'])
    name = []
    cate = []
    content = []
    raw_name = []
    raw_content = []
    for index, row in tqdm(list(df_data.iterrows())):
        cates = row['cate'].split(',')
        nem = text_preprocess(row['name'])
        cont = text_preprocess(row['content'])
        for c in cates:
            if int(c) in dict_label:
                name.append(nem)
                cate.append(int(c))
                content.append(cont)
                raw_name.append(row['name'])
                raw_content.append(row['content'])

    df_data = pd.DataFrame({"name": name, "cate": cate, "content": content})

    rm_index = []
    for index, row in df_data.iterrows():
        if row['cate'] not in dict_label:
            rm_index.append(index)
    df_data = df_data.drop(rm_index)
    label = [dict_label[i] for i in df_data['cate'].tolist()]
    df_data['label'] = label

    df_data = df_data[df_data.duplicated('cate',keep=False)]

    df_data["label"] = df_data["label"].astype('category')
    df_data["label_id"] = df_data["label"].cat.codes
    id2label = dict(enumerate(df_data.label.cat.categories))
    with open(path_id2label, 'w') as outfile:
        json.dump(id2label, outfile)
        
    print(">> id2label:", id2label)

    num_classes = len(df_data['label'].unique())
    print(">> num_classes:", num_classes)
                                                                
    df_data['text'] = df_data['name'] + ' ' + df_data['content']
    
    # path_df = 'df_data.csv'
    # df_data.to_csv(path_df, index=False)
    df_data.to_csv(path_vip_data)
    
    print("current directory:", pathlib.Path().resolve())
    print("os.listdir:", os.listdir(pathlib.Path().resolve()))
                                                                        
    # from collections import namedtuple
    # output = namedtuple("output", ["path_vip_data", "num_classes"])
    # return output(df_data, num_classes)

In [None]:
def process_data4predict_func(path_data: InputPath(str), path_label: InputPath(str), path_pattern: InputPath(str), path_vip_data4predict: OutputPath(str)):
    import pandas as pd 
    import re
    from tqdm import tqdm
    
    pattern = open(path_pattern).read()
    print("pattern", pattern)
    
    def text_preprocess(text: str) -> str:
        text = text.lower()
        text = re.sub(pattern, ' ', text).strip().split()
        text = ' '.join(text)
        return text
    
    df_label = pd.read_csv(path_label)
    print(">> Read input_path_label success!!!")
    
    dict_label = {}
    for index, row in df_label.iterrows(): 
            name = row['cate_name'].lower()
            name = re.sub(pattern, ' ', name).strip().replace('  ', ',')
            dict_label[row['cate_id']] = name
    
    
    data = pd.read_csv(path_data, sep = "	", names=["id", "cate", "name", "content"])
    data = data.dropna(subset=['cate', 'name', 'content'])

    new_cate = []
    len_cate = []
    for index, row in data.iterrows():
        n = []
        for id in row['cate'].split(','):
            if int(id) in dict_label:
                n.append(int(id))
        new_cate.append(n)
        len_cate.append(len(n))

    data['cate'] = new_cate
    data['len_cate'] = len_cate
    data = data[data['len_cate']>0]


    data['text'] = data['name'] + ' ' + data['content']
    label = [list(set([dict_label[int(i)] for i in cate if int(i) in dict_label])) for cate in data['cate'].tolist()]
    data['label'] = label

    data = data.reset_index(drop=True)
    data.to_csv(path_vip_data4predict)
    

In [None]:
def train_model_func(path_vip_data: InputPath(str)) -> NamedTuple("output", [("path_ckp", str), ("path_tokenizer", str)]):
    import pandas as pd
    import numpy as np
    import pickle
    from keras.preprocessing.text import Tokenizer
    from keras_preprocessing.sequence import pad_sequences
    
    from sklearn.model_selection import train_test_split
    
    import torch
    from torch import nn
    from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler, Dataset
    import pytorch_lightning as pl
    from pytorch_lightning.callbacks import ModelCheckpoint
    
    import os
    import pathlib
    print("current directory:", pathlib.Path().resolve())
    print("os.listdir:", os.listdir(pathlib.Path().resolve()))
    print("os.listdir /data:", os.listdir('/data'))
    
    print("path_vip_data:", path_vip_data)
    
    df_data = pd.read_csv(path_vip_data)
    # df_data = path_vip_data
    
    MAX_SEQUENCE_LENGTH = 30
    MAX_NB_WORDS = 4200

    tokenizer = Tokenizer(num_words=MAX_NB_WORDS,lower=True)
    tokenizer.fit_on_texts(np.array(df_data['text'].tolist()))

    train_X, val_X, train_Y, val_Y = train_test_split(df_data['text'], df_data['label_id'], 
                                                                        random_state=2022, 
                                                                        test_size=0.3, 
                                                                        stratify=df_data['label_id'])

    train_X = train_X.tolist()
    val_X = val_X.tolist()
    train_Y = train_Y.tolist()
    val_Y = val_Y.tolist()

    train_X = tokenizer.texts_to_sequences(train_X)
    train_X = pad_sequences(train_X, maxlen=MAX_SEQUENCE_LENGTH, padding='post')

    val_X = tokenizer.texts_to_sequences(val_X)
    val_X = pad_sequences(val_X, maxlen=MAX_SEQUENCE_LENGTH, padding='post')
    
    train_X = torch.tensor(train_X)
    train_Y = torch.tensor(train_Y)
    val_X = torch.tensor(val_X)
    val_Y = torch.tensor(val_Y)
    
    batch_size = 128
    workers = 8
    train_data = TensorDataset(train_X, train_Y)
    train_sampler = RandomSampler(train_data)
    train_dataloader = DataLoader(train_data, sampler=train_sampler, batch_size=batch_size, num_workers=workers)

    val_data = TensorDataset(val_X, val_Y)
    val_sampler = RandomSampler(val_data)
    val_dataloader = DataLoader(val_data, sampler=val_sampler, batch_size=batch_size, num_workers=workers, shuffle=False)
    
    class Classify(pl.LightningModule):
        def __init__(self, n_fea=50, vocab_size=2000, max_length=30, num_classes=300):
            super(Classify, self).__init__()
            self.embed_layer = nn.Embedding(vocab_size, n_fea)  
            self.linear_0 = nn.Linear(max_length*n_fea,1024)
            self.linear_1 = nn.Linear(1024,512)
            self.linear_2 = nn.Linear(512, num_classes)
            self.relu_layer = nn.ReLU()

            self.criterion = nn.CrossEntropyLoss()
        def forward(self, x):
            x = self.embed_layer(x)
            x = x.view(x.shape[0], 1, -1)

            x = self.linear_0(x)
            x = self.relu_layer(x)

            x = self.linear_1(x)
            x = self.relu_layer(x)

            x = self.linear_2(x)
            return x

        def loss(self, logits, labels):
            return self.criterion(logits, labels)

        def training_step(self, batch, batch_idx):
            x, y = batch
            logits = self.forward(x)
            logits = logits.squeeze()
            loss = self.loss(logits, y)

            self.log("train_loss", loss, on_epoch=True)
            return loss

        def validation_step(self, batch, batch_idx):
            x, y = batch
            logits = self.forward(x)
            logits = logits.squeeze()
            loss = self.loss(logits, y)

            self.log("val_loss", loss, on_epoch=True)
            return loss

        def predict_step(self, batch, batch_idx):
            x,y = batch
            return self(x)

        def configure_optimizers(self):
            return torch.optim.Adam(self.parameters(), lr=0.001)
    
    MAX_NB_WORDS = 4200
    MAX_SEQUENCE_LENGTH = 30
    model = Classify(n_fea=50, vocab_size=MAX_NB_WORDS, max_length=MAX_SEQUENCE_LENGTH, num_classes=92)
    checkpoint_callback = ModelCheckpoint(
        save_top_k=1,
        monitor="val_loss",
        mode="min",
        dirpath="/data",
        filename="best_ckp",
    )
    trainer = pl.Trainer(max_epochs=3, callbacks=[checkpoint_callback])
    trainer.fit(model, train_dataloader, val_dataloader)
    
    path_tokenizer = '/data/tokenizer.pickle'
    with open(path_tokenizer, 'wb') as handle:
        pickle.dump(tokenizer, handle, protocol=pickle.HIGHEST_PROTOCOL)
    
    print("current directory:", pathlib.Path().resolve())
    print("os.listdir:", os.listdir(pathlib.Path().resolve()))
    print("os.listdir /data:", os.listdir('/data'))
    
    ckp_path = '/data/best_ckp.ckpt'
    
    from collections import namedtuple
    output = namedtuple("output", ["path_ckp", "path_tokenizer"])
    return output(ckp_path, path_tokenizer)
    
   
    


In [None]:
def predict_func(path_ckp: str, path_vip_data4predict: InputPath(str), path_id2label: InputPath(str), path_pattern: InputPath(str), path_tokenizer: str):
    import pandas as pd
    import numpy as np
    import re
    import pickle
    from keras.preprocessing.text import Tokenizer
    from keras_preprocessing.sequence import pad_sequences
    
    from sklearn.model_selection import train_test_split
    
    import torch
    from torch import nn
    from torch.utils.data import TensorDataset, DataLoader, RandomSampler, SequentialSampler, Dataset
    import pytorch_lightning as pl
    from pytorch_lightning.callbacks import ModelCheckpoint
    
    import json
    import random
    
    import os
    import pathlib
    print("current directory:", pathlib.Path().resolve())
    print("os.listdir:", os.listdir(pathlib.Path().resolve()))
    print("os.listdir /data:", os.listdir('/data'))
    
    
    with open(path_tokenizer, 'rb') as handle:
        tokenizer = pickle.load(handle)
    pattern = open(path_pattern).read()
    def text_preprocess(text: str) -> str:
        text = text.lower()
        text = re.sub(pattern, ' ', text).strip().split()
        text = ' '.join(text)
        return text
    
    class Classify(pl.LightningModule):
        def __init__(self, n_fea=50, vocab_size=2000, max_length=30, num_classes=300):
            super(Classify, self).__init__()
            self.embed_layer = nn.Embedding(vocab_size, n_fea)  
            self.linear_0 = nn.Linear(max_length*n_fea,1024)
            self.linear_1 = nn.Linear(1024,512)
            self.linear_2 = nn.Linear(512, num_classes)
            self.relu_layer = nn.ReLU()
            self.criterion = nn.CrossEntropyLoss()
        def forward(self, x):
            x = self.embed_layer(x)
            x = x.view(x.shape[0], 1, -1)

            x = self.linear_0(x)
            x = self.relu_layer(x)

            x = self.linear_1(x)
            x = self.relu_layer(x)

            x = self.linear_2(x)
            return x

        def loss(self, logits, labels):
            return self.criterion(logits, labels)

        def training_step(self, batch, batch_idx):
            x, y = batch
            logits = self.forward(x)
            logits = logits.squeeze()
            loss = self.loss(logits, y)

            self.log("train_loss", loss, on_epoch=True)
            return loss

        def validation_step(self, batch, batch_idx):
            x, y = batch
            logits = self.forward(x)
            logits = logits.squeeze()
            loss = self.loss(logits, y)

            self.log("val_loss", loss, on_epoch=True)
            return loss

        def predict_step(self, batch, batch_idx):
            x,y = batch
            return self(x)

        def configure_optimizers(self):
            return torch.optim.Adam(self.parameters(), lr=0.001)
        
    def load_model(ckp_path):
        MAX_NB_WORDS = 4200
        MAX_SEQUENCE_LENGTH = 30
        num_classes = 92
        device = 'cpu' 
        model = Classify.load_from_checkpoint(ckp_path, n_fea=50, vocab_size=MAX_NB_WORDS, max_length=MAX_SEQUENCE_LENGTH, num_classes=num_classes)
        model.to(device)
        model.eval()
        return model

    def predict(model, text, thres=0.8):
        MAX_SEQUENCE_LENGTH = 30
        text = text_preprocess(text)
        toks = tokenizer.texts_to_sequences([text])
        toks = pad_sequences(toks, maxlen=MAX_SEQUENCE_LENGTH, padding='post')
        toks = torch.tensor(toks)

        m = nn.Sigmoid()
        result = {'cate': [], 'score': [], 'ids': []}
        with torch.no_grad():
            out = model(toks)
        out = m(out)
        for index, s in enumerate(out[0][0]):
            if s>thres:
                result['cate'].append(id2label[str(index)])
                result['score'].append(round(float(s), 3))
        return result
    
    id2label = json.load(open(path_id2label))
    print(">> id2label", id2label)
    model = load_model(path_ckp)
    df_data = pd.read_csv(path_vip_data4predict)
    
    sample = 50
    for _ in range(sample):
        index = random.randint(0, 30000)
        text = df_data.iloc[index]['text']
        label = df_data.iloc[index]['label']
        print(text)
        print(label)
        r = predict(model, text)
        print(list(zip(r['cate'], r['score'])))
        print('='*50)
    
    
    

In [None]:
@dsl.pipeline(name="training_pipeline",
              description="hello, this is description of training pipeline")
def training_pipeline(id_data="1bABs9vHNHJvlZmGzK5kORQtOOkvLel3j", id_label="1bKG4ZsaBAEMRUXKTsHm26Lod5CsfLz-r", id_pattern="19LKS_RBIuAaKjy2gNqrFIUxc0INUQPEe"):
    
    data_op = dsl.VolumeOp(name="categories-banner-volume",
                           resource_name="my-pvc",
                           size="4Gi",
                           storage_class="openebs-hostpath",
                           modes=dsl.VOLUME_MODE_RWO)
    
    download_data_op = kfp.components.func_to_container_op(download_data_func, packages_to_install=["gdown"])
    process_data_op = kfp.components.func_to_container_op(process_data_func, packages_to_install=["pandas", "tqdm"])
    process_data4predict_op = kfp.components.func_to_container_op(process_data4predict_func, packages_to_install=["pandas", "tqdm"])
    
    training_op = kfp.components.func_to_container_op(train_model_func,
                                                           packages_to_install=["pandas", "keras", "sklearn", "pytorch_lightning", "torch", "tensorflow-cpu"])
    
    predict_op = kfp.components.func_to_container_op(predict_func,
                                                           packages_to_install=["pandas", "keras", "sklearn", "pytorch_lightning", "torch", "tensorflow-cpu"])
    
    step0 = download_data_op(id_data=id_data, id_label=id_label, id_pattern=id_pattern)
    step1 = process_data_op(path_data=step0.outputs['path_data'], path_label=step0.outputs['path_label'], path_pattern=step0.outputs['path_pattern'])
    step2 = training_op(step1.outputs['path_vip_data']).add_pvolumes({"/data": data_op.volume})
    step3 = process_data4predict_op(path_data=step0.outputs['path_data'], path_label=step0.outputs['path_label'], path_pattern=step0.outputs['path_pattern'])
    step4 = predict_op(path_ckp=step2.outputs['path_ckp'], path_vip_data4predict=step3.outputs['path_vip_data4predict'], path_id2label=step1.outputs['path_id2label'], path_pattern=step0.outputs['path_pattern'], path_tokenizer=step2.outputs['path_tokenizer']).add_pvolumes({"/data": data_op.volume.after(step2)})
    
    # print(">>>>", step2.output)
    

In [None]:
kfp.compiler.Compiler().compile(training_pipeline, "new_sample_pipeline.yaml")