In [None]:
from os import getenv
from socket import gethostbyname, gethostname

In [None]:
# create SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder\
.master('spark://first-spark-bdl-spark-master.first.svc.cluster.local:7077') \
.appName("wine_quality_prediction") \
.config('spark.jars.packages', 'ml.combust.mleap:mleap-spark_2.11:0.13.0')\
.config("spark.driver.host",gethostbyname(gethostname())) \
.getOrCreate()

In [None]:
import pandas as pd
import mleap.pyspark
import mleap.sklearn.base

# get the dataset
url_1 = "https://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-red.csv"
url_2 = "https://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-white.csv"
data = pd.read_csv(url_2, sep=';')

In [None]:
df = spark.createDataFrame(data,schema=data.columns.tolist())
# rename quality column to label
df_label = df.withColumnRenamed("quality","label")
cols = df_label.columns
cols.remove("label")

In [None]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=cols, outputCol="features")

In [None]:
from sklearn.model_selection import train_test_split
#we will split the data:
train, test = df_label.randomSplit([0.8, 0.2], seed=12345)

In [None]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, featuresCol = 'features', labelCol = 'label')

In [None]:
from pyspark.ml import Pipeline
import mleap.sklearn.pipeline
# CREATE PIPELINE
pipeline = Pipeline(stages=[assembler, lr])

In [None]:
#call fit() on the pipeline , it will result a pipeline model ( which by definition is a Transformer )
fitted_pipeline = pipeline.fit(train)

In [None]:
fitted_pipeline

In [None]:
from mleap.pyspark.spark_support import SimpleSparkSerializer
# Serialize the model
model_version=1.3
fitted_pipeline.serializeToBundle("jar:file:/tmp/wq_mleap-bundle-{}.zip".format(model_version), fitted_pipeline.transform(df_label.withColumnRenamed("quality","label")))


In [None]:
%%script env  model_version="$model_version" bash
bdl -mkdir -p /models
bdl -copyFromLocal -f /tmp/wq_mleap-bundle-$model_version.zip /models/
rm /tmp/wq_mleap-bundle-$model_version.zip

In [None]:
spark.stop()