####Install

In [0]:
%sh
pip install nltk
pip install stop-words
pip install pyspellchecker

You should consider upgrading via the '/databricks/python3/bin/python -m pip install --upgrade pip' command.
You should consider upgrading via the '/databricks/python3/bin/python -m pip install --upgrade pip' command.
You should consider upgrading via the '/databricks/python3/bin/python -m pip install --upgrade pip' command.


####TBD

1. Tokenization into words
2. Stop words removal
3. Noise reduction (e.g., removal of punctuation)
4. Stemmin

#### 1. Load Data

In [0]:
# load additional 1000 labeled data

import pandas as pd
import numpy as np
# File location and type
file_location = "/FileStore/tables/additional.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type).option("inferSchema", infer_schema).option("header", "true").option("sep", delimiter).load(file_location)

#display(df)
pandasDF_news = df.select('news').toPandas()
pandasDF_target = df.select('target').toPandas()


In [0]:
# load original dataset (based on research paper)
from sklearn.datasets import fetch_20newsgroups
import pandas as pd
import numpy as np


#categories = ['rec.autos', 'rec.sport.baseball', 'comp.graphics', 'comp.sys.mac.hardware', 
#              'sci.space', 'sci.crypt', 'talk.politics.guns', 'talk.religion.misc']
newsgroup = fetch_20newsgroups(subset='train',remove=('headers', 'footers', 'quotes'), shuffle=True, random_state=42)


df_news = pd.DataFrame(data=newsgroup.data, columns=['news']) 
df_news = df_news.append(pandasDF_news, ignore_index=True)

df_target = pd.DataFrame(data=newsgroup.target, columns=['target'])
df_target = df_target.append(pandasDF_target, ignore_index=True)
df_target = df_target.dropna()
df_target['target']=df_target.target.astype('int64')

# binary labels for linear SVM
df_binary_labels = pd.DataFrame(np.where (df_target < 10, 0, 1), columns=['Binary Label'])

In [0]:
import re

#data basic cleaning before merge it to pyspark.
df_news = df_news.replace(re.compile(r"From: \S*@\S*\s?"),"")
df_news = df_news.replace(re.compile('\s+')," ")
df_news = df_news.replace(re.compile("\'"),"")





df_news = df_news.dropna()

In [0]:
# Convert all the data to pyspark dataframe
from pyspark.sql.functions import lower, col
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)
df_newsgroup = sqlContext.createDataFrame(pd.concat([df_news, df_target, df_binary_labels], axis=1))

#Convert news column to to lower case
df_train = df_newsgroup.withColumn('news', lower(df_newsgroup.news))



#### 2. Pipeline

In [0]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover
from pyspark.ml.feature import Tokenizer, HashingTF, IDF, StringIndexer,CountVectorizer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import LinearSVC
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline


regexTokenizer = RegexTokenizer(inputCol="news", outputCol="news_words", pattern="\\W")
add_stopwords = ["http","https","amp","rt","t","c","the","subject","re",'.',',','', 'i i','?','\'\'',"''",'y','*','out','==','df','e.g.','\'m','\[',"'m",':', ')', '(','n\'t', '\'','``','``','\'s', 'https://','-'] 
stopwordsRemover = StopWordsRemover(inputCol="news_words", outputCol="filtered").setStopWords(add_stopwords)
countTF = CountVectorizer(inputCol=stopwordsRemover.getOutputCol(), outputCol="features")

pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countTF])
pipelineFit = pipeline.fit(df_newsgroup)

dataset = pipelineFit.transform(df_newsgroup)
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
#lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
#rf = RandomForestClassifier(featuresCol=idf.getOutputCol(), labelCol=string_indexer.getOutputCol(), maxDepth=10)
#rf_mod = rf.fit(trainingData)

svm = LinearSVC(featuresCol=countTF.getOutputCol(), labelCol='Binary Label', maxIter=5, regParam=0.01)
svm_model = svm.fit(trainingData)

#lrModel = lr.fit(trainingData)
#predictions = lrModel.transform(testData)
predictions = svm_model.transform(testData)


#### 3. Evaluate ML Model

In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="Binary Label", predictionCol="prediction")
evaluator.evaluate(predictions)

accuracy = evaluator.evaluate(predictions)
print("Accuracy = %s" % (accuracy))
print("Test Error = %s" % (1.0 - accuracy))

Accuracy = 0.7592548942622471
Test Error = 0.24074510573775287


#### 4. Parameter tuning

In [0]:
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator

trainingData1 = trainingData.drop("news_words","news_tf","news_tfidf","rawPrediction","probability","prediction","filtered","rawFeatures","CrossValidator_2b30ebf36fbb_rand")
testData1 = testData.drop("news_words","news_tf","news_tfidf","rawPrediction","probability","prediction","filtered","rawFeatures","CrossValidator_2b30ebf36fbb_rand")


#trainingData1.show(5)

#grid for randomforest
#grid = (ParamGridBuilder().baseOn([evaluator.metricName, 'precision']).addGrid(rf.maxDepth, [10, 20]).build())
grid = (ParamGridBuilder().baseOn([evaluator.metricName, 'precision']).addGrid(svm_model.maxIter, [10, 100]).addGrid(svm_model.regParam,[0.001, 0.01, 0.1, 1.0, 10.0]).build())

# Instanciation of a CrossValidator
cv = CrossValidator(estimator=svm, estimatorParamMaps=grid, evaluator=evaluator, numFolds=3)

# Transform the data and train the classifier on the training set
cv_model = cv.fit(trainingData1)

# Transform the data and perform predictions on the test set
df_test_pred1 = cv_model.transform(testData1)

# Evaluate the predictions done on the test set
evaluator.evaluate(df_test_pred1)

+----+------+------------+--------------+
|news|target|Binary Label|      features|
+----+------+------------+--------------+
|    |     0|           0|(111027,[],[])|
|    |     1|           0|(111027,[],[])|
|    |     1|           0|(111027,[],[])|
|    |     2|           0|(111027,[],[])|
|    |     2|           0|(111027,[],[])|
+----+------+------------+--------------+
only showing top 5 rows

 To try the new MLflow PySpark ML autologging feature, which will be enabled by default in an upcoming release, call `mlflow.pyspark.ml.autolog()`.
MLlib will automatically track trials in MLflow. After your tuning fit() call has completed, view the MLflow UI to see logged runs.
Out[48]: 0.8226720479399566

In [0]:

bestModel = cv_model.bestModel

print ('Best Param (regParam): ', bestModel._java_obj.getRegParam())
print ('Best Param (MaxIter): ', bestModel._java_obj.getMaxIter())
print ("Accuracy: " + str (evaluator.evaluate(df_test_pred1, {evaluator.metricName: "accuracy"})) )
print ("weightedPrecision: " + str (evaluator.evaluate(df_test_pred1, {evaluator.metricName: "weightedPrecision"})) )
print ("weightedRecall: " + str (evaluator.evaluate(df_test_pred1, {evaluator.metricName: "weightedRecall"}))  )
print ("weightedTruePositiveRate: " + str (evaluator.evaluate(df_test_pred1, {evaluator.metricName: "weightedTruePositiveRate"})) )
print ("weightedFalsePositiveRate: " + str (evaluator.evaluate(df_test_pred1, {evaluator.metricName: "weightedFalsePositiveRate"})) )
print ("weightedFMeasure: " + str (evaluator.evaluate(df_test_pred1, {evaluator.metricName: "weightedFMeasure"})) )
print ("truePositiveRateByLabel: " + str (evaluator.evaluate(df_test_pred1, {evaluator.metricName: "truePositiveRateByLabel"})))
print ("falsePositiveRateByLabel: " + str (evaluator.evaluate(df_test_pred1, {evaluator.metricName: "falsePositiveRateByLabel"})) )
print ("precisionByLabel: " + str (evaluator.evaluate(df_test_pred1, {evaluator.metricName: "precisionByLabel"})) )
print ("recallByLabel: " + str (evaluator.evaluate(df_test_pred1, {evaluator.metricName: "recallByLabel"})) )
print ("fMeasureByLabel: " + str (evaluator.evaluate(df_test_pred1, {evaluator.metricName: "fMeasureByLabel"})) )
#print ("logLoss: " + str (evaluator.evaluate(df_test_pred1, {evaluator.metricName: "logLoss"})) )
print ("hammingLoss: " + str (evaluator.evaluate(df_test_pred1, {evaluator.metricName: "hammingLoss"})) )



Best Param (regParam):  0.001
Best Param (MaxIter):  10
Accuracy: 0.8246170100369783
weightedPrecision: 0.8372535425400436
weightedRecall: 0.8246170100369783
weightedTruePositiveRate: 0.8246170100369783
weightedFalsePositiveRate: 0.17803989656447733
weightedFMeasure: 0.8226720479399566
truePositiveRateByLabel: 0.9238790406673618
falsePositiveRateByLabel: 0.2773019271948608
precisionByLabel: 0.7737991266375546
recallByLabel: 0.9238790406673618
fMeasureByLabel: 0.8422053231939163
hammingLoss: 0.17538298996302165


In [0]:
df_test_pred1.show(16)

df_test_pred1.select('news').collect()






+--------------------+------+------------+--------------------+--------------------+----------+
|                news|target|Binary Label|            features|       rawPrediction|prediction|
+--------------------+------+------------+--------------------+--------------------+----------+
|                    |     2|           0|      (111027,[],[])|[0.28375699268290...|       0.0|
|                    |     7|           0|      (111027,[],[])|[0.28375699268290...|       0.0|
|                    |     7|           0|      (111027,[],[])|[0.28375699268290...|       0.0|
|                    |     8|           0|      (111027,[],[])|[0.28375699268290...|       0.0|
|                    |     9|           0|      (111027,[],[])|[0.28375699268290...|       0.0|
|                    |    10|           1|      (111027,[],[])|[0.28375699268290...|       0.0|
|                    |    10|           1|      (111027,[],[])|[0.28375699268290...|       0.0|
|                    |    11|           