In [1]:
import argparse
import os

from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder
from pyspark.sql import SparkSession
from pyspark.ml.classification import GBTClassifier
import mlflow

In [2]:
LABEL_COL = 'has_car_accident'

In [3]:
def build_pipeline(train_alg):
    """
    Создание пайплаина над выбранной моделью.

    :return: Pipeline
    """
    sex_indexer = StringIndexer(inputCol='sex',
                                outputCol="sex_index")
    car_class_indexer = StringIndexer(inputCol='car_class',
                                      outputCol="car_class_index")
    features = ["age", "sex_index", "car_class_index", "driving_experience",
                "speeding_penalties", "parking_penalties", "total_car_accident"]
    assembler = VectorAssembler(inputCols=features, outputCol='features')
    return Pipeline(stages=[sex_indexer, car_class_indexer, assembler, train_alg])

In [4]:
def evaluate_model(evaluator, predict, metric_list):
    for metric in metric_list:
        evaluator.setMetricName(metric)
        score = evaluator.evaluate(predict)
        mlflow.log_metric(f"{metric}", score)
        print(f"{metric} score = {score}")

In [5]:
def optimization(pipeline, gbt, train_df, evaluator):
    grid = ParamGridBuilder() \
        .addGrid(gbt.maxDepth, [3, 5]) \
        .addGrid(gbt.maxIter, [20, 30]) \
        .addGrid(gbt.maxBins, [16, 32]) \
        .build()
    tvs = TrainValidationSplit(estimator=pipeline,
                               estimatorParamMaps=grid,
                               evaluator=evaluator,
                               trainRatio=0.8)
    models = tvs.fit(train_df)
    return models.bestModel

In [6]:
def start_mf():
    mlflow.start_run()
    mlflow.set_tracking_uri("https://mlflow.lab.karpov.courses")
    mlflow.set_experiment(experiment_name = "e-lavrushkin!")

In [7]:
def finish_mf():
    mlflow.end_run()

In [8]:
def log_stage_mf(model):
    for i in range(0, len(model.stages)):
        stage = model.stages[i]
        mlflow.log_param(f'stage_{i}_type', stage)
        if type(stage) is VectorAssembler:
            mlflow.log_param(f'stage_{i}_input', stage.getInputCols())
            mlflow.log_param(f'stage_{i}_output', stage.getOutputCol())
        elif type(stage) is StringIndexerModel:
            mlflow.log_param(f'stage_{i}_input', stage.getInputCol())
            mlflow.log_param(f'stage_{i}_output', stage.getOutputCol())
        else:
            mlflow.log_param(f'stage_{i}_features', stage.getFeaturesCol())
            mlflow.log_param(f'stage_{i}_label', stage.getLabelCol())

In [9]:
def log_save_model_mf(model):
    mv = mlflow.spark.log_model(model,
                           artifact_path = "test",
                           registered_model_name="test")

In [10]:
def process(spark, train_path, test_path):
    """
    Основной процесс задачи.

    :param spark: SparkSession
    :param train_path: путь до тренировочного датасета
    :param test_path: путь до тренировочного датасета
    """
    evaluator = MulticlassClassificationEvaluator(labelCol=LABEL_COL, predictionCol="prediction", metricName='f1')
    train_df = spark.read.parquet(train_path)
    test_df = spark.read.parquet(test_path)

    gbt = GBTClassifier(labelCol=LABEL_COL)
    pipeline = build_pipeline(gbt)

    model = optimization(pipeline, gbt, train_df, evaluator)
    predict = model.transform(test_df)
    
    start_mf()
    log_stage_mf(model)
    
    evaluate_model(evaluator, predict, ['f1', 'weightedPrecision', 'weightedRecall', 'accuracy'])
    log_save_model_mf(model)
    finish_mf()
    print('Best model saved')

In [11]:
def main(train_path, test_path):
    spark = _spark_session()
    process(spark, train_path, test_path)

In [12]:
def _spark_session():
    """
    Создание SparkSession.

    :return: SparkSession
    """
    return SparkSession.builder.appName('PySparkMLJob').getOrCreate()

In [32]:
if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument('--train', type=str, default='train.parquet', help='Please set train datasets path.')
    parser.add_argument('--test', type=str, default='test.parquet', help='Please set test datasets path.')
    args = parser.parse_args()
    train = args.train
    test = args.test
    main(train, test)

Exception: Run with UUID aaaeed11b813499aa1a24b5487029ca1 is already active. To start a new run, first end the current run with mlflow.end_run(). To start a nested run, call start_run with nested=True