In [None]:
#Install the wget which is used to get the data from web servers
!pip install wget --user 

In [None]:
#Importing the NAIVEbayes go sales data
import wget
                        
link_to_data = 'https://apsportal.ibm.com/exchange-api/v1/entries/8044492073eb964f46597b4be06ff5ea/data?accessKey=9561295fa407698694b1e254d0099600'
filename = wget.download(link_to_data)

print filename

In [None]:
# spark 2.2.0 intialization 

spark = SparkSession.builder.getOrCreate()

#Dataframe

df_data = spark.read.format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
        .option('header','true').option('inferSchema','true').load(filename)


In [None]:
df_data.printSchema()

In [None]:
df_data.show(10,False)

In [None]:
print("Total count for the input GO Sales dataset is "+ str(df_data.count()))

In [None]:
#Apache® Spark machine learning model

#Prepare data randomSplit(weights, seed=None)
#Parameters:	
#weights – list of doubles as weights with which to split the DataFrame. Weights will be normalized if they don’t sum up to 1.0.
#seed – The seed for sampling.

splitted_data = df_data.randomSplit([0.8, 0.18, 0.02], 24)
train_data = splitted_data[0]
test_data = splitted_data[1]
predict_data = splitted_data[2]

print "Number of training records: " + str(train_data.count())
#train_data.show(10, False)
print "Number of testing records : " + str(test_data.count())
#test_data.show(10, False)
print "Number of prediction records : " + str(predict_data.count())
#predict_data.show(10, False)
print "total count of the dataset: " + str(predict_data.count() + train_data.count() + test_data.count())

#The train data set, which is the largest group, is used for training.
#The test data set will be used for model evaluation and is used to test the assumptions of the model.
#The predict data set will be used for prediction.

In [None]:
#Create pipeline and train a model

from pyspark.ml.feature import OneHotEncoder, StringIndexer, IndexToString, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline, Model

In [None]:
#Convert all the string fields to numeric ones by using the StringIndexer transformer.
#indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(df_data) for column in list(set(df_data.columns)) ]

stringIndexer_label = StringIndexer(inputCol="PRODUCT_LINE", outputCol="label").fit(df_data)
stringIndexer_prof = StringIndexer(inputCol="PROFESSION", outputCol="PROFESSION_IX")
stringIndexer_gend = StringIndexer(inputCol="GENDER", outputCol="GENDER_IX")
stringIndexer_mar = StringIndexer(inputCol="MARITAL_STATUS", outputCol="MARITAL_STATUS_IX")

#pipeline = Pipeline(stages=indexers)
#df_r = pipeline.fit(df_data).transform(df_data)
#df_r.show(10, False)
#df_ind = stringIndexer_label.transform(df_data)
#df_ind.printSchema()
#df_ind.show(50, False)

In [None]:
#Create a feature vector by combining all features together..
vectorAssembler_features = VectorAssembler(inputCols=["GENDER_IX", "AGE", "MARITAL_STATUS_IX", "PROFESSION_IX"], outputCol="features")

In [None]:
#define estimators you want to use for classification
rf = RandomForestClassifier(labelCol="label", featuresCol="features")


In [None]:
#indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=stringIndexer_label.labels)



In [None]:
#A pipeline consists of transformers and an estimator
pipeline_rf = Pipeline(stages=[stringIndexer_label, stringIndexer_prof, stringIndexer_gend, stringIndexer_mar,
                               vectorAssembler_features, rf, labelConverter])

In [None]:
#Training the Model
train_data.printSchema()
model_rf = pipeline_rf.fit(train_data)


In [None]:
#model accuracy 
predictions = model_rf.transform(test_data)
evaluatorRF = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluatorRF.evaluate(predictions)
print("Accuracy = %g" % accuracy)
print("Test Error = %g" % (1.0 - accuracy))

In [None]:
#Persist model

from repository_v3.mlrepositoryclient import MLRepositoryClient
from repository_v3.mlrepositoryartifact import MLRepositoryArtifact

In [None]:
wml_credentials={
  "url": "https://ibm-watson-ml.mybluemix.net",
  "access_key": "***",
  "username": "919bbb87-dce6-4218-839c-7d6ce418b987",
  "password": "e10fa74e-8691-44f8-8e88-9fdf2013b1b8",
  "instance_id": "028b5393-6057-48a7-a8f2-a5d49679ed4f"
}

In [None]:
ml_repository_client = MLRepositoryClient(wml_credentials['url'])
ml_repository_client.authorize(wml_credentials['username'], wml_credentials['password'])

In [None]:
pipeline_artifact = MLRepositoryArtifact(pipeline_rf, name="pipeline")

In [None]:
model_artifact = MLRepositoryArtifact(model_rf, training_data=train_data, name="Product Line Prediction", pipeline_artifact=pipeline_artifact)

In [None]:
#Save pipeline and model
saved_model = ml_repository_client.models.save(model_artifact)

In [None]:
#Get saved model metadata from Watson Machine Learning.
#meta.available_props() to get the list of available props.
saved_model.meta.available_props()

In [None]:
print "modelType: " + saved_model.meta.prop("frameworkName")
print "creationTime: " + str(saved_model.meta.prop("creationTime"))
print "label_column: " + str(saved_model.meta.prop("label_column"))
print "inputDataSchema: " + str(saved_model.meta.prop("inputDataSchema"))
print "modelVersionUrl: " + str(saved_model.meta.prop("modelVersionUrl"))
print "framework_runtimes: " + str(saved_model.meta.prop("framework_runtimes"))


In [None]:
#Load model
loadedModelArtifact = ml_repository_client.models.get(saved_model.uid)

In [None]:
print str(loadedModelArtifact.name)

In [None]:
#Predict locally and visualize

predictions = loadedModelArtifact.model_instance().transform(predict_data)

In [None]:
predictions.show(5,False)

In [None]:
predictions.select("predictedLabel").groupBy("predictedLabel").count().show(truncate=False)