In [2]:
# using findspark to get jupyter notebook to recognize spark
import findspark
findspark.init()

In [3]:
# loading the articles csv
from pyspark.sql import SQLContext
from pyspark import SparkContext
sc =SparkContext()
sqlContext = SQLContext(sc)
data = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('./data/shuffled_articles.csv')

In [4]:
# checking the data
data.show(5)

+--------------------+--------+
|        article_text|category|
+--------------------+--------+
|PhotoGood morning...|politics|
|A version of this...|  sports|
|GLENN KENNY The o...|  movies|
|Federal accident ...|business|
|Ken Hechler, a le...|politics|
+--------------------+--------+
only showing top 5 rows



In [5]:
# look at the schema
data.printSchema()

root
 |-- article_text: string (nullable = true)
 |-- category: string (nullable = true)



In [6]:
# look at the categories
from pyspark.sql.functions import col
#data.groupBy("category").count().orderBy(col("count").desc()).show()

In [7]:
# tokenize and get vectors

from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression

# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="article_text", outputCol="words", pattern="\\W")

# getting stop words from nltk corpus
from nltk.corpus import stopwords
add_stop_words = list(set(stopwords.words('english')))

stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stop_words)

# bag of words count
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)

In [8]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

# setting up the pipeline
label_stringIdx = StringIndexer(inputCol = "category", outputCol = "label")
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])

# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
dataset.show(5)

+--------------------+--------+--------------------+--------------------+--------------------+-----+
|        article_text|category|               words|            filtered|            features|label|
+--------------------+--------+--------------------+--------------------+--------------------+-----+
|PhotoGood morning...|politics|[photogood, morni...|[photogood, morni...|(4152,[0,1,2,3,4,...|  1.0|
|A version of this...|  sports|[a, version, of, ...|[version, article...|(4152,[1,8,22,37,...|  2.0|
|GLENN KENNY The o...|  movies|[glenn, kenny, th...|[glenn, kenny, op...|(4152,[0,3,4,11,1...|  3.0|
|Federal accident ...|business|[federal, acciden...|[federal, acciden...|(4152,[2,10,17,61...|  0.0|
|Ken Hechler, a le...|politics|[ken, hechler, a,...|[ken, hechler, le...|(4152,[0,2,5,8,11...|  1.0|
+--------------------+--------+--------------------+--------------------+--------------------+-----+
only showing top 5 rows



In [9]:
# set seed for reproducibility and doing a 80-20 training/test split
(trainingData, testData) = dataset.randomSplit([0.8, 0.2], seed = 279)

print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

Training Dataset Count: 395
Test Dataset Count: 80


In [10]:
# TF-DIF
# Extracting features

from pyspark.ml.feature import HashingTF, IDF

hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=10000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms

pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, hashingTF, idf, label_stringIdx])
pipelineFit = pipeline.fit(data)

dataset = pipelineFit.transform(data)
(trainingData, testData) = dataset.randomSplit([0.8, 0.2], seed = 279)

In [11]:
# fitting a naive bayes classifier 

from pyspark.ml.classification import NaiveBayes

nb = NaiveBayes(smoothing=1)
model = nb.fit(trainingData)

predictions = model.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("article_text","category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+------------------------------+--------+------------------------------+-----+----------+
|                  article_text|category|                   probability|label|prediction|
+------------------------------+--------+------------------------------+-----+----------+
|For devotees of President T...|business|[1.0,4.453107315457469E-19,...|  0.0|       0.0|
|MEAN bosses could have kill...|business|[1.0,2.5394656950292945E-21...|  0.0|       0.0|
|PARIS � The eurozone econom...|business|[1.0,1.5951862611678954E-22...|  0.0|       0.0|
|BUT for starters, as anyone...|business|[1.0,1.5218498006537328E-24...|  0.0|       0.0|
|To the Editor:Mr. Obama has...|business|[1.0,5.653790142303447E-30,...|  0.0|       0.0|
|You have to respect America...|politics|[1.0,2.6293152673047053E-33...|  1.0|       0.0|
|President Trump faces a cho...|business|[1.0,3.576130617812927E-34,...|  0.0|       0.0|
|LONDON � Barclays announced...|business|[1.0,3.445015165588454E-35,...|  0.0|       0.0|
|WASHINGTO

In [12]:
# evaluating the naive bayes classifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.7610570824524312

In [13]:
# Random Forest classifier

from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol="label", \
                            featuresCol="features", \
                            numTrees = 100, \
                            maxDepth = 4, \
                            maxBins = 32)

# Train model with Training Data
rfModel = rf.fit(trainingData)
predictions = rfModel.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("article_text","category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+------------------------------+--------+------------------------------+-----+----------+
|                  article_text|category|                   probability|label|prediction|
+------------------------------+--------+------------------------------+-----+----------+
|LONDON � The French insurer...|business|[0.4317033239799247,0.17201...|  0.0|       0.0|
|WASHINGTON � Small-business...|business|[0.4114852273958765,0.17811...|  0.0|       0.0|
|The inaction helped Toshiba...|business|[0.4114624849360925,0.18102...|  0.0|       0.0|
|For devotees of President T...|business|[0.40304342756103007,0.2053...|  0.0|       0.0|
|Airbus said Monday it was i...|business|[0.40285660071443663,0.2047...|  0.0|       0.0|
|The Business Insider boss H...|business|[0.39842789023948344,0.1882...|  0.0|       0.0|
|Since President Obama moved...|business|[0.37630731023653247,0.2327...|  0.0|       0.0|
|President Trump faces a cho...|business|[0.37509000964707495,0.2189...|  0.0|       0.0|
|The websi

In [14]:
# evaluating the random forest classifier
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.6661218487394958