In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import concat, lit, udf
from pyspark.sql.types import *
from pyspark.ml.feature import Tokenizer
import string
import re
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer, IDF
from pyspark.ml.tuning import *
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
import pickle

spark = SparkSession.builder.getOrCreate()

In [6]:
import itertools
import numpy as np
from pyspark import since, keyword_only
from pyspark.ml import Estimator, Model
from pyspark.ml.param import Params, Param, TypeConverters
from pyspark.ml.param.shared import HasSeed
from pyspark.sql.functions import rand

class Tuner(TrainValidationSplit):
    def fit(self, train, validation):
        est = self.getOrDefault(self.estimator)
        epm = self.getOrDefault(self.estimatorParamMaps)
        numModels = len(epm)
        eva = self.getOrDefault(self.evaluator)
        metrics = [0.0] * numModels
        models = est.fit(train, epm)
        for j in range(numModels):
            model = models[j]
            metric = eva.evaluate(model.transform(validation, epm[j]))
            metrics[j] += metric
        if eva.isLargerBetter():
            bestIndex = np.argmax(metrics)
        else:
            bestIndex = np.argmin(metrics)
        bestModel = est.fit(train.union(validation), epm[bestIndex])
        return self._copyValues(TrainValidationSplitModel(bestModel, metrics))

In [7]:
def validateModel(model, filename):
    test = spark.read.load('../dataset/merged/article/')
    test = test.withColumn('label', test._hyperpartisan.cast('integer'))
    test = model.transform(test)
    ev = BinaryClassificationEvaluator()
    with open(filename,"w") as file:
        file.write(f"{ev.getMetricName()}: {ev.evaluate(test)}\n")
        ev = MulticlassClassificationEvaluator()
        file.write(f"{ev.getMetricName()}: {ev.evaluate(test)}\n")
        ev.setMetricName("weightedPrecision")
        file.write(f"{ev.getMetricName()}: {ev.evaluate(test)}\n")
        ev.setMetricName("weightedRecall")
        file.write(f"{ev.getMetricName()}: {ev.evaluate(test)}\n")
        ev.setMetricName("accuracy")
        file.write(f"{ev.getMetricName()}: {ev.evaluate(test)}\n")

In [9]:
training = spark.read.load('../dataset/merged/publisherTest/')
training = training.withColumn('label', training._hyperpartisan.cast('integer'))
validation = spark.read.load('../dataset/merged/publisherValidation/')
validation = validation.withColumn('label', validation._hyperpartisan.cast('integer'))

hashingTF = HashingTF(inputCol="words", outputCol="rawfeatures")
idf = IDF(inputCol="rawfeatures", outputCol="features")
lr = LogisticRegression()
pipeline = Pipeline(stages=[hashingTF, idf, lr])

paramGrid = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [1000, 10000, 100000]) \
    .addGrid(idf.minDocFreq, [0, 10, 50]) \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .addGrid(lr.maxIter, [10, 20]) \
    .build()

tuner = Tuner(estimator=pipeline,
              estimatorParamMaps=paramGrid,
              evaluator=MulticlassClassificationEvaluator(metricName='accuracy'))

model = tuner.fit(train=training, validation=validation)
    
##numFeatures: 1000, minDocFreq: 0, regParam: 0.1, maxIter: 20
    
try:
    validateModel(model, "output_Tuner")
except:
    pass


In [None]:
#df = spark.read.load('../dataset/merged/publisher/')
#df = df.withColumn('label', df._hyperpartisan.cast('integer'))
df = training.union(validation)

hashingTF = HashingTF(inputCol="words", outputCol="rawfeatures")
idf = IDF(inputCol="rawfeatures", outputCol="features")
lr = LogisticRegression()
pipeline = Pipeline(stages=[hashingTF, idf, lr])

paramGrid = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [1000, 10000, 100000]) \
    .addGrid(idf.minDocFreq, [0, 10, 50]) \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .addGrid(lr.maxIter, [10, 20]) \
    .build()


crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(metricName='accuracy'),
                          numFolds=10)

model = crossval.fit(df)

##numFeatures: 100k, minDocFreq: 50, regParam: 0.01, maxIter: 20

try:
    validateModel(model, "output_CV")
except:
    pass

with open("output_CV","a") as f:
    print(model.avgMetrics, file=f)
    print(max(model.avgMetrics), file=f)
    print(list(zip(model.avgMetrics, paramGrid)), file=f)
    


In [8]:
spark.stop()