# Workflow

## data_generator.py

In [1]:
import subprocess
import pandas as pd
import pyspark
import logging

from pyspark.sql import SQLContext, Row, SparkSession
from sklearn.datasets import make_classification

logging.basicConfig(format='%(asctime)s - %(message)s', 
                    datefmt='%d-%b-%y %H:%M:%S')
logging.getLogger().setLevel(logging.INFO)

def generate_data(path, file_system='local', **args):
    """
        n_samples=100,n_features=4,
        class_sep=1.0, n_informative=2,
        n_redundant=2, random_state=rs
    """

    X, y = make_classification(**args)

    
    df = pd.DataFrame(X, columns=['x_'+str(i+1) for i in range(X.shape[1])])
    df = pd.concat([df, pd.DataFrame({'y':y})], axis=1)
    
    if file_system =='local':
        df.to_csv(path, index=False)
        print(df.head())
        logging.info(f'Dataset was generated successfully and saved in {path} ')
    
    elif file_system =='hdfs':
        cluster_manager = 'yarn'
        spark = SparkSession.builder\
        .master(cluster_manager)\
        .appName("myapp")\
        .config("spark.driver.allowMultipleContexts", "true")\
        .getOrCreate()
        
        spark_df = spark.createDataFrame(df)
        spark_df.show(5)
        spark_df.limit(10000).write.mode('overwrite').parquet(path)
        logging.info(f'Dataset was generated successfully and saved in hdfs://{path} ')
    
        
    

In [2]:
generate_data(path='/tmp/original_data.csv', file_system='local',
                           n_samples=100,n_features=4,
                            class_sep=1.0, n_informative=2,
                            n_redundant=2, random_state=1)


17-Jun-19 11:51:30 - Dataset was generated successfully and saved in /tmp/original_data.csv 


        x_1       x_2       x_3       x_4  y
0  2.257082  3.174625  2.783862 -0.478191  0
1  1.350390  1.949810  1.760475 -0.403942  1
2  1.159745  1.119558  0.468043  0.949132  1
3 -1.360882 -1.425550 -0.759553 -0.852599  0
4  0.033744 -0.675969 -1.319113  1.682271  1


## data_gathering.py

In [2]:
import mlflow
client = mlflow.tracking.MlflowClient()
client

<mlflow.tracking.client.MlflowClient at 0x7fb7a0612278>

In [4]:
expr = client.list_experiments()
expr

[]

In [9]:
mlflow.get_artifact_uri()

'/root/project/mlruns_modified/0/0d14d05b5c604619abd96b08e7c61328/artifacts'

In [15]:
client = mlflow.tracking.MlflowClient()
client

<mlflow.tracking.client.MlflowClient at 0x7ff9ba675f28>

In [20]:
client.get_experiment_by_name('exp1')

<Experiment: artifact_location='/root/project/mlruns_modified/0', experiment_id='0', lifecycle_stage='active', name='exp1'>

In [1]:
from __future__ import print_function

import requests
import tempfile
import os
import zipfile
import pyspark
import mlflow
import click
import subprocess
import glob
import sklearn
import logging
import pandas as pd

from pyspark.sql import SQLContext, Row, SparkSession

print(mlflow.__version__) # it must be 1.0

uri = '/root/project/mlruns_modified'
exp_name = 'exp_workflow'
mlflow.set_tracking_uri(uri)

artifact_location = os.path.join('hdfs:///tmp', exp_name)
exp_id = mlflow.create_experiment(exp_name, 
                                  artifact_location=artifact_location)
# exp_id = mlflow.create_experiment('exp_workflow')
# exp_id = 0
print(f'exp_name = {exp_name} | exp_id = {exp_id}')
print(f'artifact_location = {artifact_location}')
# print(exp_id)
# mlflow.set_experiment('exp_workflow')
# mlflow.server.
# @click.command(help="Script that gather a dataset from a source data / repository")
# @click.option("--path", help="Dataset's path readable by Spark. Located in HDFS.")
def gather_data(source):
#     with mlflow.start_run(experiment_id = exp_id) as mlrun:
    with mlflow.start_run() as mlrun:
#         path = f'hdfs://{path}'
        cluster_manager = 'yarn'
        spark = SparkSession.builder\
        .master(cluster_manager)\
        .appName("gather_data")\
        .config("spark.driver.allowMultipleContexts", "true")\
        .getOrCreate()
        
        df = pd.read_csv(source)
        print(df.head())
        logging.info(f'Dataset: {source} was read successfully ') 
        spark_df = spark.createDataFrame(df)
        parquet_file = os.path.join(artifact_location, 'original_data.parquet')
        spark_df.limit(10000).write.mode('overwrite').parquet(parquet_file)
        logging.info(f'Dataset was saved in {parquet_file} ')
        

#         mlflow.log_artifact(path, path_artifact)
        mlflow.log_artifact(source)


# if __name__ == '__main__':
gather_data(source='/tmp/original_data.csv')


1.0.0
exp_name = exp_workflow | exp_id = 0
artifact_location = hdfs:///tmp/exp_workflow
        x_1       x_2       x_3       x_4  y
0  2.257082  3.174625  2.783862 -0.478191  0
1  1.350390  1.949810  1.760475 -0.403942  1
2  1.159745  1.119558  0.468043  0.949132  1
3 -1.360882 -1.425550 -0.759553 -0.852599  0
4  0.033744 -0.675969 -1.319113  1.682271  1


# preprocessing.py

In [11]:
import os
import zipfile
import pyspark
import mlflow
import click
import sklearn
import logging
import tempfile

# exp_name = 'exp_workflow'
# artifact_location = os.path.join('hdfs:///tmp', exp_name)
exp_name = 'exp_workflow'
artifact_location = os.path.join('hdfs:///tmp', exp_name)

def preprocessing(source, target):
#     with mlflow.start_run(experiment_id = exp_id) as mlrun:
    with mlflow.start_run(nested=True) as mlrun:
#         path = f'hdfs://{path}'
        cluster_manager = 'yarn'
        spark = SparkSession.builder\
        .master(cluster_manager)\
        .appName("gather_data")\
        .config("spark.driver.allowMultipleContexts", "true")\
        .getOrCreate()
        
        spark_df = spark.read.parquet(source)
        logging.info(f'Dataset: hdfs://{source} was read successfully ') 
        spark_df.show(5)
        
        #Some preprocessing steps here
        spark_df = spark_df.dropna()
        spark_df.limit(10000).write.mode('overwrite').parquet(target)
        logging.info(f'Cleaned dataset was saved in {target} ')
        
        df_cleaned = spark_df.toPandas()
        mlflow.log_param(key='n_samples', value=len(df_cleaned))
        mlflow.log_param(key='n_features', value=len(df_cleaned.columns)-1)
        dict_types = dict([(x,str(y)) for x,y in zip(df.columns, df.dtypes.values)])
        mlflow.log_param(key='dtypes', value=dict_types)
        mlflow.log_param(key='classes', value=df_cleaned['y'].unique())
        mlflow.log_param(key='problem_type', value='classification')
        

        
        tmpdir = tempfile.mkdtemp()
        tmp_file = os.path.join(tmpdir, 'cleaned_data.csv')
        df_cleaned.to_csv(tmp_file, index=False)
#         path_cleaned_data = '/tmp/exp_workflow/original_cleaned.parquet'
        mlflow.log_artifact(tmp_file)
        
#         mlflow.log_artifact(path, path_artifact)
#         mlflow.log_artifact('/root/project/ui_run', path_artifact)


# if __name__ == '__main__':
preprocessing('/tmp/exp_workflow/original_data.parquet', '/tmp/exp_workflow/cleaned_data.parquet')

+--------------------+-------------------+-------------------+--------------------+---+
|                 x_1|                x_2|                x_3|                 x_4|  y|
+--------------------+-------------------+-------------------+--------------------+---+
|   2.257081698491103| 3.1746248283618392|   2.78386208858466| -0.4781910157533783|  0|
|  1.3503902613309968| 1.9498100498575857| 1.7604752337080989|-0.40394190691776144|  1|
|   1.159745242939114| 1.1195576811232828|  0.468043236945129|  0.9491322319035848|  1|
| -1.3608824689920729| -1.425549824985629|-0.7595532711067904| -0.8525989473220963|  0|
|0.033743594128649315|-0.6759687711021607|-1.3191129184946184|  1.6822707033005866|  1|
+--------------------+-------------------+-------------------+--------------------+---+
only showing top 5 rows



## feature_engineering.py

TODO

## modeling.py

In [25]:
import os
import zipfile
import pyspark
import mlflow
import click
import sklearn
import logging
import tempfile
import keras

from sklearn.model_selection import train_test_split
from keras.models import Sequential
from keras.layers import Dense
from keras.callbacks import ModelCheckpoint, EarlyStopping
from keras.regularizers import l2
# exp_name = 'exp_workflow'
# artifact_location = os.path.join('hdfs:///tmp', exp_name)

def modeling(source, target):
#     with mlflow.start_run(experiment_id = exp_id) as mlrun:
    with mlflow.start_run(nested=True) as mlrun:
#         path = f'hdfs://{path}'
        cluster_manager = 'yarn'
        spark = SparkSession.builder\
        .master(cluster_manager)\
        .appName("gather_data")\
        .config("spark.driver.allowMultipleContexts", "true")\
        .getOrCreate()
        
        spark_df = spark.read.parquet(source)
        logging.info(f'Dataset: hdfs://{source} was read successfully ') 
        spark_df.show(5)
        
        df = spark_df.toPandas()
        X = df.loc[:, df.columns != 'y']
        y = df['y']
        X_train, X_test, y_train, y_test = train_test_split(
                                X, y, test_size=0.2, random_state=42)
        
        model = Sequential()

        model.add(Dense(30, input_dim=4, kernel_initializer='normal', activation='relu'))
        model.add(Dense(1, kernel_initializer='normal', activation='sigmoid'))
        # Compile model
        model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])

#         filepath = '/tmp/ALS_checkpoint_weights.hdf5'
        early_stopping = EarlyStopping(monitor='val_loss', min_delta=0.0001, patience=2, mode='auto')

        model.fit(X_train, y_train, validation_split=.2, verbose=2, epochs=10,
                  batch_size=128, shuffle=False)

        train_acc = model.evaluate(X_train, y_train, verbose=2)
        print(train_acc)
        test_acc = model.evaluate(X_test, y_test, verbose=2)
        print(test_acc)
        mlflow.log_metric("train_acc", round(train_acc[1], 2))
        mlflow.log_metric("test_acc", round(test_acc[1], 2))

        print('The model had a ACC on the test set of {0}'.format(test_acc))
        # TODO: Check the right path of  the keras model (artifact)
#         mlflow.keras.log_model(model, "models")
        #     mlflow.keras.save_model(model, "keras-model")
        
#         mlflow.log_artifact(path, path_artifact)
#         mlflow.log_artifact('/root/project/ui_run', path_artifact)


# if __name__ == '__main__':
modeling('/tmp/exp_workflow/cleaned_data.parquet', '/tmp/exp_workflow/final_data.parquet')

+--------------------+-------------------+-------------------+--------------------+---+
|                 x_1|                x_2|                x_3|                 x_4|  y|
+--------------------+-------------------+-------------------+--------------------+---+
|   2.257081698491103| 3.1746248283618392|   2.78386208858466| -0.4781910157533783|  0|
|  1.3503902613309968| 1.9498100498575857| 1.7604752337080989|-0.40394190691776144|  1|
|   1.159745242939114| 1.1195576811232828|  0.468043236945129|  0.9491322319035848|  1|
| -1.3608824689920729| -1.425549824985629|-0.7595532711067904| -0.8525989473220963|  0|
|0.033743594128649315|-0.6759687711021607|-1.3191129184946184|  1.6822707033005866|  1|
+--------------------+-------------------+-------------------+--------------------+---+
only showing top 5 rows

Train on 64 samples, validate on 16 samples
Epoch 1/10
 - 1s - loss: 0.6933 - acc: 0.4219 - val_loss: 0.6884 - val_acc: 0.6250
Epoch 2/10
 - 0s - loss: 0.6923 - acc: 0.4219 - val_loss

## main.py

In [2]:
"""
Downloads the MovieLens dataset, ETLs it into Parquet, trains an
ALS model, and uses the ALS model to train a Keras neural network.

See README.rst for more details.
"""

import click
import os


import mlflow
from mlflow.utils import mlflow_tags
from mlflow.entities import RunStatus
from mlflow.utils.logging_utils import eprint
import six

from mlflow.tracking.fluent import _get_experiment_id

print(mlflow.__version__) # it must be 1.0

uri = '/root/project/mlruns_modified'
mlflow.set_tracking_uri(uri)

exp_name = 'exp_workflow'
artifact_location = os.path.join('hdfs:///tmp', exp_name)
exp_id = mlflow.create_experiment(exp_name, 
                                  artifact_location=artifact_location)

print(f'exp_name = {exp_name} | exp_id = {exp_id}')
print(f'artifact_location = {artifact_location}')


def _already_ran(entry_point_name, parameters, git_commit, experiment_id=None):
    """Best-effort detection of if a run with the given entrypoint name,
    parameters, and experiment id already ran. The run must have completed
    successfully and have at least the parameters provided.
    """
    experiment_id = experiment_id if experiment_id is not None else _get_experiment_id()
    client = mlflow.tracking.MlflowClient()
    all_run_infos = reversed(client.list_run_infos(experiment_id))
    for run_info in all_run_infos:
        full_run = client.get_run(run_info.run_id)
        tags = full_run.data.tags
        if tags.get(mlflow_tags.MLFLOW_PROJECT_ENTRY_POINT, None) != entry_point_name:
            continue
        match_failed = False
        for param_key, param_value in six.iteritems(parameters):
            run_value = full_run.data.params.get(param_key)
            if run_value != param_value:
                match_failed = True
                break
        if match_failed:
            continue

        if run_info.status != RunStatus.FINISHED:
            eprint(("Run matched, but is not FINISHED, so skipping "
                    "(run_id=%s, status=%s)") % (run_info.run_id, run_info.status))
            continue

        previous_version = tags.get(mlflow_tags.MLFLOW_GIT_COMMIT, None)
        if git_commit != previous_version:
            eprint(("Run matched, but has a different source version, so skipping "
                    "(found=%s, expected=%s)") % previous_version, git_commit)
            continue
        return client.get_run(run_info.run_id)
    eprint("No matching run has been found.")
    return None


# TODO(aaron): This is not great because it doesn't account for:
# - changes in code
# - changes in dependant steps
def _get_or_run(entrypoint, parameters, git_commit, use_cache=True):
    existing_run = _already_ran(entrypoint, parameters, git_commit)
    if use_cache and existing_run:
        print("Found existing run for entrypoint=%s and parameters=%s" % (entrypoint, parameters))
        return existing_run
    print("Launching new run for entrypoint=%s and parameters=%s" % (entrypoint, parameters))
    submitted_run = mlflow.run(".", entrypoint, parameters=parameters, use_conda=False)
    return mlflow.tracking.MlflowClient().get_run(submitted_run.run_id)


# @click.command()
# @click.option("--source", help="Dataset's path in local.")
# @click.option("--keras-hidden-units", default=20, type=int)
def workflow(source, keras_hidden_units):
    # Note: The entrypoint names are defined in MLproject. The artifact directories
    # are documented by each step's .py file.
    with mlflow.start_run() as active_run:
#         os.environ['SPARK_CONF_DIR'] = os.path.abspath('.')
        git_commit = active_run.data.tags.get(mlflow_tags.MLFLOW_GIT_COMMIT)
        print(git_commit)
        data_gathering_run = _get_or_run("data_gathering", {'source': source}, git_commit)
        print(data_gathering_run)
#         ratings_csv_uri = os.path.join(gather_data_run.info.artifact_uri, "ratings-csv-dir")
#         etl_data_run = _get_or_run("etl_data",
#                                    {"ratings_csv": ratings_csv_uri,
#                                     "max_row_limit": max_row_limit},
#                                    git_commit)
#         ratings_parquet_uri = os.path.join(etl_data_run.info.artifact_uri, "ratings-parquet-dir")

#         # We specify a spark-defaults.conf to override the default driver memory. ALS requires
#         # significant memory. The driver memory property cannot be set by the application itself.
#         als_run = _get_or_run("als", 
#                               {"ratings_data": ratings_parquet_uri, "max_iter": str(als_max_iter)},
#                               git_commit)
#         als_model_uri = os.path.join(als_run.info.artifact_uri, "als-model")

#         keras_params = {
#             "ratings_data": ratings_parquet_uri,
#             "als_model_uri": als_model_uri,
#             "hidden_units": keras_hidden_units,
#         }
#         _get_or_run("train_keras", keras_params, git_commit, use_cache=False)


# if __name__ == '__main__':
#     workflow()

workflow(source='/tmp/original_data.csv',
        keras_hidden_units=20)


None


Run matched, but is not FINISHED, so skipping (run_id=241ab9690acf49d9b14488d632aa2bbf, status=FAILED)
No matching run has been found.
2019/06/18 08:48:23 INFO mlflow.projects: === Created directory /tmp/tmp8y_9ekmn for downloading remote URIs passed to arguments of type 'path' ===
2019/06/18 08:48:23 INFO mlflow.projects: === Running command 'python data_gathering.py --source /tmp/original_data.csv' in run with ID '3449c6349d124db1ba87f30af6a3c20c' === 


Launching new run for entrypoint=data_gathering and parameters={'source': '/tmp/original_data.csv'}


ExecutionException: Run (ID '3449c6349d124db1ba87f30af6a3c20c') failed

In [33]:
source='/tmp/original_data.csv'
with mlflow.start_run() as active_run:
    git_commit = active_run.data.tags.get(mlflow_tags.MLFLOW_GIT_COMMIT)
    print(git_commit)
    gather_data_run = _get_or_run("gather_data", {'source': source}, git_commit)
    gather_data_run

None


Run matched, but is not FINISHED, so skipping (run_id=7aa72ee90c124734bb393e20b8f666cc, status=FAILED)
No matching run has been found.
2019/06/18 08:35:54 INFO mlflow.projects: === Created directory /tmp/tmpto_zwkg4 for downloading remote URIs passed to arguments of type 'path' ===
2019/06/18 08:35:54 INFO mlflow.projects: === Running command 'python gather_data.py --source /tmp/original_data.csv' in run with ID '4033914092f74eeaa1d01283bb1d1de3' === 


Launching new run for entrypoint=gather_data and parameters={'source': '/tmp/original_data.csv'}


ExecutionException: Run (ID '4033914092f74eeaa1d01283bb1d1de3') failed