In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
# specify substep parameters for interactive run
# this cell will be replaced during job run with the parameters from json within params subfolder
substep_params={
}

In [None]:
# load pipeline and step parameters - do not edit
from sinara.substep import get_pipeline_params, get_step_params
pipeline_params = get_pipeline_params(pprint=True)
step_params = get_step_params(pprint=True)

In [None]:
#3 define substep interface
from sinara.substep import NotebookSubstep, ENV_NAME, PIPELINE_NAME, ZONE_NAME, STEP_NAME, RUN_ID, ENTITY_NAME, ENTITY_PATH, SUBSTEP_NAME

substep = NotebookSubstep(pipeline_params, step_params, substep_params)

substep.interface(
   
    inputs =
    [
        { STEP_NAME: "data_prep", ENTITY_NAME: "df_X_train" },
        { STEP_NAME: "data_prep", ENTITY_NAME: "df_Y_train" },
        { STEP_NAME: "data_prep", ENTITY_NAME: "df_X_eval" },
        { STEP_NAME: "data_prep", ENTITY_NAME: "df_Y_eval" },
        { STEP_NAME: "data_prep", ENTITY_NAME: "df_X_test" },
        { STEP_NAME: "data_prep", ENTITY_NAME: "df_Y_test" }
    ],
    outputs = 
    [
        { ENTITY_NAME: "bento_service" },
    ]
)

substep.print_interface_info()

substep.exit_in_visualize_mode()

In [None]:
#4 run spark
from sinara.spark import SinaraSpark

spark = SinaraSpark.run_session(0)
SinaraSpark.ui_url()

In [None]:
#5 read inputs 
prev_step_inputs = substep.inputs(step_name="data_prep")

df_X_train = spark.read.parquet(prev_step_inputs.df_X_train).to_pandas_on_spark()
df_Y_train = spark.read.parquet(prev_step_inputs.df_Y_train).to_pandas_on_spark()
df_X_eval = spark.read.parquet(prev_step_inputs.df_X_eval).to_pandas_on_spark()
df_Y_eval = spark.read.parquet(prev_step_inputs.df_Y_eval).to_pandas_on_spark()
df_X_test = spark.read.parquet(prev_step_inputs.df_X_test).to_pandas_on_spark()
df_Y_test = spark.read.parquet(prev_step_inputs.df_Y_test).to_pandas_on_spark()

In [None]:
#6 Train the model predicting median house price (MEDV)
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.metrics import root_mean_squared_error

reg = GradientBoostingRegressor(random_state=239)
reg.fit(df_X_train.values, df_Y_train.values)

In [None]:
#7 check trained model quality on eval dataset using RMSE
rmse = root_mean_squared_error(df_Y_eval.values, reg.predict(df_X_eval.values))
print("The root mean squared error (RMSE) on eval set: {:.4f}".format(rmse))

In [None]:
#8 create and save a bentoservice
from model_service import ModelService
from sinara.bentoml import save_bentoservice

outputs = substep.outputs()
test_data = {}
test_data['X'] = df_X_test.to_dict(orient='records')
test_data['Y'] = df_Y_test.to_dict(orient='records')

model = ModelService()
model.pack('model', reg)
model.pack('test_data', test_data)

save_bentoservice(model, path=outputs.bento_service, substep=substep)

In [None]:
#8 stop spark
SinaraSpark.stop_session()