In [None]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import urllib.request

In [None]:
sc

In [None]:
url = "https://raw.githubusercontent.com/databricks/mlflow-example-sklearn-elasticnet-wine/master/wine-quality.csv"
urllib.request.urlretrieve(url, "wine.csv")

In [None]:
!hdfs dfs -put wine.csv wine.csv
!hdfs dfs -ls

In [None]:
inputDF = spark.read.csv("wine.csv", header="true", inferSchema="true")
inputDF.printSchema()

In [None]:
inputDF.show(3)

In [None]:
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline

# select the columns to be used as the features (all except `quality`)
featureColumns = [c for c in inputDF.columns if c != 'quality']

# create and configure the assembler
assembler = VectorAssembler(inputCols=featureColumns, 
                            outputCol="features")

In [None]:
from pyspark.ml.regression import LinearRegression

trainDF, testDF = inputDF.randomSplit([0.75, 0.25], seed = 30)

# fit a `LinearRegression` model using features in colum `features` and label in column `quality`
lr = LinearRegression(maxIter=30, regParam=0.3, elasticNetParam=0.3, featuresCol="features", labelCol="quality")
pipeline = Pipeline(stages=[assembler, lr])

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
import mlflow
import mlflow.spark

#mlflow.spark.autolog()

paramGrid = ParamGridBuilder() \
    .addGrid(lr.elasticNetParam, [0.1, 0.2, 0.3, 0.4, 0.5]) \
    .addGrid(lr.regParam, [0.3, 0.1, 0.01]) \
    .build()

mlflow.set_experiment("Experimento Spark 2")
evaluator = RegressionEvaluator(labelCol="quality")
validator = CrossValidator(estimator=pipeline,
                estimatorParamMaps=paramGrid,
                evaluator=evaluator,
                numFolds=5)
cvModel = validator.fit(trainDF)
pipeModel = cvModel.bestModel

#print(evaluator.getMetricName())
besti = np.argmin(cvModel.avgMetrics)
for i,params in enumerate(cvModel.getEstimatorParamMaps()):
    with mlflow.start_run() as run:
        for k,v in params.items():
            mlflow.log_param(k.name, v)
        rmse = cvModel.avgMetrics[i]
        mlflow.log_metric("rmse",rmse)
        if i == besti:
           mlflow.spark.log_model(pipeModel,"sparkmodel")

In [None]:
for t in zip(featureColumns, pipeModel.stages[1].coefficients):
    print(t)