In [1]:
import findspark
findspark.init()

In [2]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

df = spark.sql('''select 'spark' as hello''')
df.show()

+-----+
|hello|
+-----+
|spark|
+-----+



In [3]:
from pyspark.sql.functions import *

In [4]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [5]:
from pyspark.ml.feature import StringIndexer, VectorAssembler

In [8]:
horror_data = spark.read.csv('horror-train.csv', inferSchema=True, header=True)

In [9]:
display(horror_data)

DataFrame[id: string, text: string, author: string]

In [10]:
from pyspark.ml.feature import HashingTF, Tokenizer, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.pipeline import Pipeline

In [11]:
tokenizer = Tokenizer(inputCol="text", outputCol="words")

In [12]:
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")

In [13]:
labeler = StringIndexer(inputCol="author", outputCol="label", handleInvalid="keep")

In [14]:
lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=10)

In [15]:
paramGrid = ParamGridBuilder().addGrid(hashingTF.numFeatures, [10, 100, 1000]).addGrid(lr.regParam, [0.1, 0.01]).build()

In [16]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [17]:
pipeline = Pipeline(stages=[tokenizer, hashingTF, labeler, lr])

In [18]:
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(),
                          numFolds=5)

In [19]:
train, test = horror_data.randomSplit([.8,.2])

In [20]:
crossval_model = crossval.fit(train)

In [21]:
best_model = crossval_model.bestModel

In [22]:
eval = MulticlassClassificationEvaluator()
pred = best_model.transform(test)
eval.evaluate(pred)

0.5758593233252097

In [23]:
best_model.stages[1]._java_obj.getNumFeatures()

1000

In [24]:
from pyspark.ml.tuning import TrainValidationSplit

In [25]:
tvs = TrainValidationSplit(estimator=pipeline,
                           estimatorParamMaps=paramGrid,
                           evaluator=MulticlassClassificationEvaluator(),
                           # 80% of the data will be used for training, 20% for validation.
                           trainRatio=0.8)

In [26]:
tvs_model = tvs.fit(train)

In [27]:
tvs_model.bestModel.stages[3]._java_obj.getRegParam()

0.1

In [28]:
tvs_model.bestModel.stages[1]._java_obj.getNumFeatures()

1000

In [29]:
pred = tvs_model.bestModel.transform(horror_data)

In [30]:
eval = MulticlassClassificationEvaluator()

In [31]:
eval.evaluate(pred)

0.6087875341287915

In [32]:
from pyspark.ml.classification import NaiveBayes

In [33]:
mnb = NaiveBayes(smoothing=1.0, modelType="multinomial", featuresCol="features", labelCol="label")

In [34]:
pipeline = Pipeline(stages=[tokenizer, hashingTF, labeler, mnb])
paramGrid = ParamGridBuilder().addGrid(hashingTF.numFeatures, [10000,50000]).addGrid(lr.regParam, [0.1, 0.01]).build()

In [35]:
tvs = TrainValidationSplit(estimator=pipeline,
                           estimatorParamMaps=paramGrid,
                           evaluator=MulticlassClassificationEvaluator(),
                           # 80% of the data will be used for training, 20% for validation.
                           trainRatio=0.8)