In [1]:
import pandas as pd
import scipy.sparse as sparse
import numpy as np
from cmflib import cmf
from cmflib.cmf import metadata_push, artifact_push
import os
import io
import re 
import sys
import yaml
import gzip
import pickle
import random
import typing as t
import collections
import click
import xml.etree.ElementTree
from cmflib import cmfquery
from cmflib.cli.utils import find_root
from cmflib.utils.cmf_config import CmfConfig
import requests
from sklearn.feature_extraction.text import (CountVectorizer, TfidfTransformer)
from sklearn.ensemble import RandomForestClassifier
import json
import math
import sklearn.metrics as metrics
from tabulate import tabulate


In [4]:
#CHECK WHETHER CMF IS INITIALIZED
_=cmf.cmf_init_show()

remote.local-storage.url=/home/user/local-storage
core.remote=local-storage
cmf-server-ip = http://localhost:8080


In [3]:
#INITIALIZING LOCAL REPOSITORY
init=cmf.cmf_init(type="local",path="/home/user/local-storage",git_remote_url="http://github.com",cmf_server_url="http://localhost:8080")

Starting cmf init.
Setting 'local-storage' as a default remote.
cmf init complete.


In [5]:
__all__ = ['parse']
def _process_posts(fd_in: t.IO, fd_out_train: t.IO, fd_out_test: t.IO, target_tag: str, split: int) -> None:
    for idx, line in enumerate(fd_in):
        try:
            fd_out = fd_out_train if random.random() > split else fd_out_test
            attr = xml.etree.ElementTree.fromstring(line).attrib

            pid = attr.get("Id", "")
            label = 1 if target_tag in attr.get("Tags", "") else 0
            title = re.sub(r"\s+", " ", attr.get("Title", "")).strip()
            body = re.sub(r"\s+", " ", attr.get("Body", "")).strip()
            text = title + " " + body

            fd_out.write("{}\t{}\t{}\n".format(pid, label, text))
        except Exception as ex:
            sys.stderr.write(f"Skipping the broken line {idx}: {ex}\n")
def parse(input_file: str, output_dir: str) -> None:
    """ Parse input file (input_file) and create train/test files in output_dir directory.
    Args:
         input_file: Path to a compressed (.gz) XML-lines file (data.xml.gz).
         output_dir: Path to a directory that will contain train (train.tsv) and test (test.tsv) files.

    Machine Learning Artifacts:
        Input: ${input_file}
        Output: ${output_dir}/train.tsv, ${output_dir}/test.tsv
    """
    params = yaml.safe_load(open("params.yaml"))["parse"]
    random.seed(params["seed"])
    graph_env = os.getenv("NEO4J", "False")
    graph = True if graph_env == "True" or graph_env == "TRUE" else False
    metawriter = cmf.Cmf(filename="mlmd", pipeline_name="Test-env", graph=graph)
    _ = metawriter.create_context(pipeline_stage="Prepare", custom_properties={"user-metadata1": "metadata_value"})
    _ = metawriter.create_execution(execution_type="Prepare", custom_properties=params)
    _ = metawriter.log_dataset(input_file, "input", custom_properties={"user-metadata1": "metadata_value"})

    os.makedirs(output_dir, exist_ok=True)
    Dataset = collections.namedtuple('Dataset', ['train', 'test'])
    output_ds = Dataset(train=os.path.join(output_dir, "train.tsv"), test=os.path.join(output_dir, "test.tsv"))

    with gzip.open(input_file, "rb") as fd_in,\
         io.open(output_ds.train, "w", encoding="utf8") as fd_out_train,\
         io.open(output_ds.test, "w", encoding="utf8") as fd_out_test:
        _process_posts(fd_in, fd_out_train, fd_out_test, "<python>", params["split"])

    _ = metawriter.log_dataset(output_ds.train, "output")
    _ = metawriter.log_dataset(output_ds.test, "output")


def parse_cli(input_file: str, output_dir: str) -> None:
    parse(input_file, output_dir)

parse_cli("artifacts/data.xml.gz", "artifacts/parsed")

*** Note: CMF will check out a new branch in git to commit the metadata files ***
*** The checked out branch is mlmd. ***


\u280b Checking graph graph
\u280b Checking graph graph
\u2839 Checking graph graph
[?25h

In [6]:
__all__ = ['featurize']
def _get_df(data: str) -> pd.DataFrame:
    df = pd.read_csv(
        data,
        encoding="utf-8",
        header=None,
        delimiter="\t",
        names=["id", "label", "text"],
    )
    sys.stderr.write(f"The input data frame {data} size is {df.shape}\n")
    return df


def _save_matrix(df: pd.DataFrame, matrix, output: str) -> None:
    id_matrix = sparse.csr_matrix(df.id.astype(np.int64)).T
    label_matrix = sparse.csr_matrix(df.label.astype(np.int64)).T

    result = sparse.hstack([id_matrix, label_matrix, matrix], format="csr")

    msg = "The output matrix {} size is {} and data type is {}\n"
    sys.stderr.write(msg.format(output, result.shape, result.dtype))

    with open(output, "wb") as fd:
        pickle.dump(result, fd)

def featurize(input_dir: str, output_dir: str) -> None:
    """ Create train and test Machine Learning datasets.
    Args:
        input_dir: Path to a directory containing train.tsv and test.tsv files.
        output_dir: Path to a directory that will contain train.pkl and test.pkl files.

    Machine Learning Artifacts:
        Input: ${input_dir}/train.tsv, ${input_dir}/test.tsv
        Output: ${output_dir}/train.pkl, ${output_dir}/test.pkl
    """
    params = yaml.safe_load(open("params.yaml"))["featurize"]
    np.set_printoptions(suppress=True)

    Dataset = collections.namedtuple('Dataset', ['train', 'test'])
    input_ds = Dataset(train=os.path.join(input_dir, "train.tsv"), test=os.path.join(input_dir, "test.tsv"))

    os.makedirs(output_dir, exist_ok=True)
    output_ds = Dataset(train=os.path.join(output_dir, "train.pkl"), test=os.path.join(output_dir, "test.pkl"))
    graph_env = os.getenv("NEO4J", "False")
    graph = True if graph_env == "True" or graph_env == "TRUE" else False
    metawriter = cmf.Cmf(filename="mlmd", pipeline_name="Test-env", graph=graph)

    _ = metawriter.create_context(pipeline_stage="Featurize")
    _ = metawriter.create_execution(execution_type="Featurize-execution", custom_properties=params)

    _ = metawriter.log_dataset(input_ds.train, "input")
    _ = metawriter.log_dataset(input_ds.test, "input")

    # Generate train feature matrix
    df_train = _get_df(input_ds.train)
    train_words = np.array(df_train.text.str.lower().values.astype("U"))

    bag_of_words = CountVectorizer(
        stop_words="english", max_features=params["max_features"], ngram_range=(1, params["ngrams"])
    )

    bag_of_words.fit(train_words)
    train_words_binary_matrix = bag_of_words.transform(train_words)
    tfidf = TfidfTransformer(smooth_idf=False)
    tfidf.fit(train_words_binary_matrix)
    train_words_tfidf_matrix = tfidf.transform(train_words_binary_matrix)

    _save_matrix(df_train, train_words_tfidf_matrix, output_ds.train)

    # Generate test feature matrix
    df_test = _get_df(input_ds.test)
    test_words = np.array(df_test.text.str.lower().values.astype("U"))
    test_words_binary_matrix = bag_of_words.transform(test_words)
    test_words_tfidf_matrix = tfidf.transform(test_words_binary_matrix)

    _save_matrix(df_test, test_words_tfidf_matrix, output_ds.test)

    _ = metawriter.log_dataset(output_ds.train, "output")
    _ = metawriter.log_dataset(output_ds.test, "output")

def featurize_cli(input_dir: str, output_dir: str) -> None:
    featurize(input_dir, output_dir)

featurize_cli("artifacts/parsed", "artifacts/features")


*** Note: CMF will check out a new branch in git to commit the metadata files ***
*** The checked out branch is mlmd. ***


\u2819 Checking graph graph
\u280b Checking graph graph
The input data frame artifacts/parsed/train.tsv size is (20017, 3)
The output matrix artifacts/features/train.pkl size is (20017, 3002) and data type is float64
The input data frame artifacts/parsed/test.tsv size is (4983, 3)
The output matrix artifacts/features/test.pkl size is (4983, 3002) and data type is float64
\u2819 Checking graph graph
\u2819 Checking graph graph
[?25h

In [7]:
__all__ = ['train']


def train(input_dir: str, output_dir: str) -> None:
    """Train Machine Learning model.
    Args:
        input_dir: Path to a directory containing train.pkl file.
        output_dir: Path to a directory that will contain model.pkl file.

    Machine Learning Artifacts:
        Input: ${input_dir}/train.pkl
        Output: ${output_dir}/model.pkl
    """
    params = yaml.safe_load(open("params.yaml"))["train"]
    graph_env = os.getenv("NEO4J", "False")
    graph = True if graph_env == "True" or graph_env == "TRUE" else False
    metawriter = cmf.Cmf(filename="mlmd", pipeline_name="Test-env", graph=graph)
    _ = metawriter.create_context(pipeline_stage="Train")
    _ = metawriter.create_execution(execution_type="Train-execution", custom_properties=params)

    train_ds = os.path.join(input_dir, "train.pkl")
    _ = metawriter.log_dataset(train_ds, "input")
    with open(train_ds, "rb") as fd:
        matrix = pickle.load(fd)

    labels = np.squeeze(matrix[:, 1].toarray())
    x = matrix[:, 2:]

    sys.stderr.write("Input matrix size {}\n".format(matrix.shape))
    sys.stderr.write("X matrix size {}\n".format(x.shape))
    sys.stderr.write("Y matrix size {}\n".format(labels.shape))

    clf = RandomForestClassifier(
        n_estimators=params["n_est"], min_samples_split=params["min_split"], n_jobs=2, random_state=params["seed"]
    )
    clf.fit(x, labels)

    os.makedirs(output_dir, exist_ok=True)
    model_file = os.path.join(output_dir, 'model.pkl')
    with open(model_file, "wb") as fd:
        pickle.dump(clf, fd)

    _ = metawriter.log_model(
        path=model_file, event="output", model_framework="SKlearn", model_type="RandomForestClassifier",
        model_name="RandomForestClassifier:default"
    )
def train_cli(input_dir: str, output_dir: str) -> None:
    train(input_dir, output_dir)
 
train_cli('artifacts/features', 'artifacts/model')

*** Note: CMF will check out a new branch in git to commit the metadata files ***
*** The checked out branch is mlmd. ***


\u280b Checking graph graph
Input matrix size (20017, 3002)
X matrix size (20017, 3000)
Y matrix size (20017,)
\u2839 Checking graph graph
[?25h

In [8]:
__all__ = ['test']

def test(model_dir: str, dataset_dir: str, output_dir: str) -> None:
    """ Test machine learning model.
    Args:
        model_dir: Path to a directory containing model.pkl file.
        dataset_dir: Path to a directory containing test.tsv file.
        output_dir: Path to a dataset that will contain several files with performance metrics (scores.json, prc.json
            and roc.json).

    Machine Learning Artifacts:
        Input: ${model_dir}/model.pkl, ${dataset_dir}/test.pkl
        Output: ExecutionMetrics
    """
    os.makedirs(output_dir, exist_ok=True)
    Artifacts = collections.namedtuple('Artifacts', ['model', 'dataset', 'scores', 'prc', 'roc'])
    artifacts = Artifacts(
        model=os.path.join(model_dir, 'model.pkl'),
        dataset=os.path.join(dataset_dir, "test.pkl"),
        scores=os.path.join(output_dir, 'scores.json'),
        prc=os.path.join(output_dir, 'prc.json'),
        roc=os.path.join(output_dir, 'roc.json')
    )
    graph_env = os.getenv("NEO4J", "False")
    graph = True if graph_env == "True" or graph_env == "TRUE" else False
    metawriter = cmf.Cmf(filename="mlmd", pipeline_name="Test-env", graph=graph)
    _ = metawriter.create_context(pipeline_stage="Evaluate")
    _ = metawriter.create_execution(execution_type="Evaluate-execution")

    # TODO: Sergey - how do I know these custom properties here?
    metawriter.log_model(
        path=artifacts.model, event="input", model_framework="sklearn", model_type="RandomForest",
        model_name="RandomForest_default"
    )
    _ = metawriter.log_dataset(artifacts.dataset, "input")

    with open(artifacts.model, "rb") as fd:
        model = pickle.load(fd)
    with open(artifacts.dataset, "rb") as fd:
        dataset = pickle.load(fd)

    labels = dataset[:, 1].toarray()
    x = dataset[:, 2:]

    predictions_by_class = model.predict_proba(x)
    predictions = predictions_by_class[:, 1]

    precision, recall, prc_thresholds = metrics.precision_recall_curve(labels, predictions)
    fpr, tpr, roc_thresholds = metrics.roc_curve(labels, predictions)

    avg_prec = metrics.average_precision_score(labels, predictions)
    roc_auc = metrics.roc_auc_score(labels, predictions)

    # ROC has a drop_intermediate arg that reduces the number of points.
    # https://scikit-learn.org/stable/modules/generated/sklearn.metrics.roc_curve.html#sklearn.metrics.roc_curve.
    # PRC lacks this arg, so we manually reduce to 1000 points as a rough estimate.
    nth_point = math.ceil(len(prc_thresholds) / 1000)
    prc_points = list(zip(precision, recall, prc_thresholds))[::nth_point]
    with open(artifacts.prc, "w") as fd:
        json.dump(
            {
                "prc": [
                    {"precision": p, "recall": r, "threshold": t}
                    for p, r, t in prc_points
                ]
            },
            fd,
            indent=4,
        )

    with open(artifacts.roc, "w") as fd:
        json.dump(
            {
                "prc": [
                    {"precision": p, "recall": r, "threshold": t}
                    for p, r, t in prc_points
                ]
            },
            fd,
            indent=4,
        )

    with open(artifacts.roc, "w") as fd:
        json.dump(
            {
                "roc": [
                    {"fpr": fp, "tpr": tp, "threshold": t}
                    for fp, tp, t in zip(fpr, tpr, roc_thresholds)
                ]
            },
            fd,
            indent=4,
        )

    exec_metrics = dict(avg_prec=avg_prec, roc_auc=roc_auc)
    with open(artifacts.scores, "w") as fd:
        json.dump(exec_metrics, fd, indent=4)
    _ = metawriter.log_execution_metrics("metrics", exec_metrics)

def test_cli(model_dir: str, dataset_dir: str, output_dir: str) -> None:
    test(model_dir, dataset_dir, output_dir)

test_cli('artifacts/model', 'artifacts/features', 'artifacts/test_results')

*** Note: CMF will check out a new branch in git to commit the metadata files ***
*** The checked out branch is mlmd. ***


\u2819 Checking graph graph
\u280b Checking graph graph
[?25h

In [None]:
__all__ = ['query']


def _print_executions_in_stage(cmf_query: cmfquery.CmfQuery, stage_name: str) -> None:
    print('\n')
    print('\n')
    df: pd.DataFrame = cmf_query.get_all_executions_in_stage(stage_name)
    df.drop(columns=['Git_Start_Commit', 'Git_End_Commit'], inplace=True, axis=1)
    print(tabulate(df, headers='keys', tablefmt='psql'))


def query(mlmd_path: str) -> None:
    cmf_query = cmfquery.CmfQuery(mlmd_path)
    stages: t.List[str] = cmf_query.get_pipeline_stages("Test-env")
    print(stages)

    for stage in stages:
        _print_executions_in_stage(cmf_query, stage)


def query_cli(mlmd_path: str):
    query(mlmd_path)

query_cli('mlmd')

In [10]:
# Start cmf-server ui-server to push mlmd 
_=metadata_push("Test-env","./mlmd")

mlmd is successfully pushed.


In [11]:
#PUSHING ARTIFACTS TO CMF-SERVER
_=artifact_push()

6 files pushed
