In [1]:
import pyspark
spark = pyspark.sql.SparkSession.builder.appName("MyApp") \
            .config("spark.jars.packages", "com.microsoft.ml.spark:mmlspark_2.11:1.0.0-rc1") \
            .config("spark.jars.repositories", "https://mmlspark.azureedge.net/maven") \
            .getOrCreate()
import mmlspark

In [2]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import lit, udf
from pyspark.sql.functions import percent_rank
from pyspark.sql import Window
import pyspark.sql.functions  as F
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler,VectorIndexer


from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.regression import GBTRegressor
from pyspark.ml import Pipeline

import mlflow
from mlflow import spark

import boto3
import os
sc= SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

class UdfModelWrapper(mlflow.pyfunc.PythonModel):
    def __init__(self, ordered_df_columns, model_artifact):
        self.ordered_df_columns = ordered_df_columns
        self.model_artifact = model_artifact

    def load_context(self, context):
        import mlflow.pyfunc
        self.spark_pyfunc = mlflow.pyfunc.load_model(context.artifacts[self.model_artifact])

    def predict(self, context, model_input):
        renamed_input = model_input.rename(
            columns={
                str(index): column_name for index, column_name
                    in list(enumerate(self.ordered_df_columns))
            }
        )
        return self.spark_pyfunc.predict(renamed_input)

def log_udf_model(artifact_path, ordered_columns, run_id):
    udf_artifact_path = f"udf-{artifact_path}"
    model_uri = f"runs:/{run_id}/{artifact_path}"
    mlflow.pyfunc.log_model(
        artifact_path = udf_artifact_path,
        python_model = UdfModelWrapper(ordered_columns, artifact_path),
        artifacts={ artifact_path: model_uri }
    )
    return udf_artifact_path



  from collections import Mapping, MutableMapping


In [3]:
customer = sqlContext.read.parquet('hdfs://isilon.tan.lab/tpch-s1/customer.parquet')
lineitem  = sqlContext.read.parquet('hdfs://isilon.tan.lab/tpch-s1/lineitem.parquet')
order = sqlContext.read.parquet('hdfs://isilon.tan.lab/tpch-s1/order.parquet')

In [4]:
customer = customer.dropna()
lineitem  = lineitem.dropna()
order = order.dropna()

In [4]:
print(customer.count(),lineitem.count(), order.count())

150000 6001215 1500000


In [5]:

sales = order.join(customer, order.o_custkey == customer.c_custkey, how = 'inner')
sales = sales.join(lineitem, lineitem.l_orderkey == sales.o_orderkey, how = 'full')
sales = sales.where('c_mktsegment == "BUILDING"').select('l_quantity','o_orderdate')

sales = sales.groupBy('o_orderdate').agg({'l_quantity': 'sum'}) .withColumnRenamed("sum(l_quantity)", "TOTAL_SALES") .withColumnRenamed("o_orderdate", "ORDERDATE")


sales = sales.withColumn('DATE', F.unix_timestamp(sales.ORDERDATE) ) \
            .withColumn('DAY', F.dayofmonth(sales.ORDERDATE) ) \
            .withColumn('WDAY', F.dayofweek(sales.ORDERDATE) )  \
            .withColumn('YDAY', F.dayofyear(sales.ORDERDATE) )  \
            .withColumn('WEEK', F.weekofyear(sales.ORDERDATE) )

sales = sales.withColumn("rank", percent_rank().over(Window.partitionBy().orderBy("DATE")))
training = sales.where("rank <= .8").drop("rank").drop("ORDERDATE")
testing  = sales.where("rank > .8").drop("rank").drop("ORDERDATE")


In [6]:
testing.limit(10).collect()

[Row(TOTAL_SALES=14047.0, DATE=860544000, DAY=9, WDAY=4, YDAY=99, WEEK=15),
 Row(TOTAL_SALES=13286.0, DATE=860630400, DAY=10, WDAY=5, YDAY=100, WEEK=15),
 Row(TOTAL_SALES=12456.0, DATE=860716800, DAY=11, WDAY=6, YDAY=101, WEEK=15),
 Row(TOTAL_SALES=14411.0, DATE=860803200, DAY=12, WDAY=7, YDAY=102, WEEK=15),
 Row(TOTAL_SALES=11975.0, DATE=860889600, DAY=13, WDAY=1, YDAY=103, WEEK=15),
 Row(TOTAL_SALES=15137.0, DATE=860976000, DAY=14, WDAY=2, YDAY=104, WEEK=16),
 Row(TOTAL_SALES=13954.0, DATE=861062400, DAY=15, WDAY=3, YDAY=105, WEEK=16),
 Row(TOTAL_SALES=13253.0, DATE=861148800, DAY=16, WDAY=4, YDAY=106, WEEK=16),
 Row(TOTAL_SALES=12366.0, DATE=861235200, DAY=17, WDAY=5, YDAY=107, WEEK=16),
 Row(TOTAL_SALES=12674.0, DATE=861321600, DAY=18, WDAY=6, YDAY=108, WEEK=16)]

In [13]:
from mmlspark.lightgbm import LightGBMRegressor
featuresCols = training.columns
featuresCols.remove('TOTAL_SALES')
training = training.withColumnRenamed("TOTAL_SALES","label")
va = VectorAssembler(inputCols = featuresCols, outputCol = "features")

#regressor = GBTRegressor(featuresCol="features", labelCol="TOTAL_SALES", predictionCol="prediction")
lgbm = LightGBMRegressor(learningRate=0.3,numIterations=100,numLeaves=31)

pipeline = Pipeline(stages=[va, lgbm])
model = pipeline.fit(training)


In [22]:
testing = testing.withColumnRenamed("TOTAL_SALES","label")
predictions = model.transform(testing)
evaluator = RegressionEvaluator(metricName="rmse", labelCol=lgbm.getLabelCol(), predictionCol=lgbm.getPredictionCol())
rmse = evaluator.evaluate(predictions)
print(rmse)

1521.494516507865


In [47]:
predictions.printSchema()
df1 = training.toPandas()
df2 = testing.toPandas()
df3 = predictions.toPandas()


root
 |-- label: double (nullable = true)
 |-- DATE: long (nullable = true)
 |-- DAY: integer (nullable = true)
 |-- WDAY: integer (nullable = true)
 |-- YDAY: integer (nullable = true)
 |-- WEEK: integer (nullable = true)
 |-- features: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [51]:
%%javascript
IPython.OutputArea.prototype._should_scroll = function(lines) {
    return false;
}

<IPython.core.display.Javascript object>

In [52]:
import plotly.graph_objects as go
fig = go.Figure()

fig.add_trace(
    go.Scatter (
        x=df1.DATE,
                y=df1['label'],
                name="training",
                line_color='deepskyblue',
                opacity=0.8))

fig.add_trace(go.Scatter(
                x=df2.DATE,
                y=df2['label'],
                name="testing",
                line_color='black',
                opacity=0.8))

fig.add_trace(go.Scatter(
                x=df3.DATE,
                y=df3['prediction'],
                name="predictions",
                line_color='red',
                opacity=0.8))

fig.update_layout(title_text="Retail : TPCH-S1 forecasting")
fig.show()
