In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, NaiveBayes
from pyspark.ml.feature import HashingTF, Tokenizer, StopWordsRemover
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

In [0]:
sqlContext = SQLContext(sc)
newDF = [
    StructField("id", IntegerType(), True),
    StructField("text", StringType(), True),
    StructField("label", DoubleType(), True)]
finalSchema = StructType(fields=newDF)



In [0]:
dataset = sqlContext.read.format('csv').options(header='true',schema=finalSchema,delimiter='|').load('/FileStore/tables/dataset.csv')
#types = [f.dataType for f in dataset.schema.fields]
#print(types)
dataset = dataset.withColumn("label", dataset["label"].cast(DoubleType()))
dataset = dataset.withColumn("id", dataset["id"].cast(IntegerType()))
training, test = dataset.randomSplit([0.8, 0.2], seed=12345)

In [0]:

tokenizer = Tokenizer(inputCol="text", outputCol="words")
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
hashingTF = HashingTF(inputCol=remover.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=2, regParam=0.001)
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")
pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, nb])

In [0]:
# Fit the pipeline to training documents.
model = pipeline.fit(training)
result = model.transform(test)\
    .select("features", "label", "prediction")
correct = result.where(result["label"] == result["prediction"])
accuracy = correct.count()/test.count()
print("Accuracy of model = "+str(accuracy))
test_error = 1 - accuracy
print ("Test error = "+str(test_error))


Accuracy of model = 0.5104166666666666
Test error = 0.48958333333333337


In [0]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
metric = evaluator.evaluate(result)
print("F1 metric = "+ str(metric))

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
metric = evaluator.evaluate(result)
print("Recall = "+ str(metric))

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
metric = evaluator.evaluate(result)
print("Precision = "+ str(metric))
model.save("nbmodelNew")

F1 metric = 0.46053494606126183
Recall = 0.5104166666666667
Precision = 0.5964778414606092
