In [1]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

import mlflow.spark
from mlflow.tracking import MlflowClient

import pandas as pd

In [2]:
import os
os.environ["MLFLOW_S3_ENDPOINT_URL"] = "http://192.168.0.14:9000"
os.environ["MLFLOW_TRACKING_URI"] = "http://192.168.0.14:5001"
os.environ["AWS_ACCESS_KEY_ID"] = "minio"
os.environ["AWS_SECRET_ACCESS_KEY"] = "miniostorage"

In [3]:
# SparkSession 생성
spark = (SparkSession
         .builder
         .appName("SparkMlflowExampleApp")
         .getOrCreate())

In [4]:
# 1. 데이터 불러오기
filePath = "./data/sf-airbnb-clean.parquet"
airbnbDF = spark.read.parquet(filePath)
(trainDF, testDF) = airbnbDF.randomSplit([.8, .2], seed = 42)

# 2. 범주형 변수 라벨링
categoricalCols = [field for (field, dataType) in trainDF.dtypes if dataType == "string"]
indexOutputCols = [x + "Index" for x in categoricalCols]
stringIndexer = StringIndexer(inputCols = categoricalCols,
                              outputCols = indexOutputCols,
                              handleInvalid = "skip")

# 3. 범주형 변수 + 수치형 변수 --(vectorassembelr)--> "features" 변수 생성
numericCols = [field for (field, dataType) in trainDF.dtypes if ((dataType == "double") & (field != "price"))]
assemblerInputs = indexOutputCols + numericCols
vecAssembler = VectorAssembler(inputCols = assemblerInputs,
                               outputCol = "features")
# 4. 모델 생성
rf = RandomForestRegressor(labelCol = "price", maxBins = 40, maxDepth = 5, numTrees = 10, seed = 42)

# 5. Pipeline 생성
pipeline = Pipeline(stages = [stringIndexer, vecAssembler, rf])

In [5]:
with mlflow.start_run(run_name = "random-forest") as run:
    # 1. 매개변수 저장
    mlflow.log_param("num_trees", rf.getNumTrees())
    mlflow.log_param("max_depth", rf.getMaxDepth())

    # 2. 모델 저장
    pipelineModel = pipeline.fit(trainDF)
    mlflow.spark.log_model(pipelineModel, "model")

    # 3. Metric(성능지표) 저장 - rmse, r2
    predDF = pipelineModel.transform(testDF)
    regressionEvaluator = RegressionEvaluator(predictionCol="prediction", labelCol="price")
    rmse = regressionEvaluator.setMetricName("rmse").evaluate(predDF)
    r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF)
    mlflow.log_metrics({"rmse": rmse, "r2": r2})

    # 4. 아티팩트(artifact) 정보 - feature importance
    rfModel = pipelineModel.stages[-1]
    pandasDF = (pd.DataFrame(list(zip(vecAssembler.getInputCols(),
                                      rfModel.featureImportances)),
                             columns=["feature", "importance"])
                .sort_values(by = "importance", ascending = False))

    # * 먼저 로컬에 파일을 저장하고 MLflow에 파일 경로를 입력하여 저장해야 함. * 
    pandasDF.to_csv("feature-importance.csv", index = False)
    mlflow.log_artifact("feature-importance.csv")



In [6]:
spark.stop()