In [1]:
import findspark
findspark.init()

In [3]:
from pyspark.sql import SparkSession
spark=SparkSession.builder\
                          .master("local")\
                          .appName('classifier')\
                          .getOrCreate()
sc=spark.sparkContext

In [4]:
import sys
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
from pyspark.ml.feature import *
from pyspark.ml.classification import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.feature import HashingTF, IDF
import nltk
from nltk.corpus import stopwords

In [5]:
sqlContext = SQLContext(sc)

In [6]:
bus_df = spark.read.text('Data/business/*')
bus_df = bus_df.withColumn("category",lit("business"))

sport_df = spark.read.text('Data/sports/*')
sport_df = sport_df.withColumn("category",lit("sports"))

pol_df = spark.read.text('Data/politics/*')
pol_df = pol_df.withColumn("category",lit("politics"))

health_df = spark.read.text('Data/medical/*')
health_df = health_df.withColumn("category",lit("medical"))

merge_df1 = bus_df.union(sport_df)
merge_df2 = merge_df1.union(pol_df)
merge_df3 = merge_df2.union(health_df)

In [7]:
merge_df1 = bus_df.union(sport_df)
merge_df2 = merge_df1.union(pol_df)
merge_df3 = merge_df2.union(health_df)

In [8]:
data = merge_df3.select([column for column in merge_df3.columns])


In [9]:
data.show(5)


+--------------------+--------+
|               value|category|
+--------------------+--------+
|   Sections SEARC...|business|
|   Sections SEARC...|business|
|             NYTi...|business|
|   Image Credit P...|business|
|             NYTi...|business|
+--------------------+--------+
only showing top 5 rows



In [10]:
bus_udf = spark.read.text('Data/unknown/business/*')
bus_udf = bus_udf.withColumn("category",lit("business"))

sport_udf = spark.read.text('Data/unknown/sports/*')
sport_udf = sport_udf.withColumn("category",lit("sports"))

pol_udf = spark.read.text('Data/unknown/politics/*')
pol_udf = pol_udf.withColumn("category",lit("politics"))

health_udf = spark.read.text('Data/unknown/medical/*')
health_udf = health_udf.withColumn("category",lit("medical"))

merge_udf1 = bus_udf.union(sport_udf)
merge_udf2 = merge_udf1.union(pol_udf)
merge_udf3 = merge_udf2.union(health_udf)

unknown_data = merge_udf3.select([column for column in merge_udf3.columns])
unknown_data.show(5)

+--------------------+--------+
|               value|category|
+--------------------+--------+
|   Sections SEARC...|business|
|             NYTi...|business|
|   Sections SEARC...|business|
|             NYTi...|business|
|   Sections SEARC...|business|
+--------------------+--------+
only showing top 5 rows



In [11]:
regexTokenizer = RegexTokenizer(inputCol="value", outputCol="words", pattern="\\W")

In [13]:
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\Chakravarthi\AppData\Roaming\nltk_data...
[nltk_data]   Unzipping corpora\stopwords.zip.


True

In [14]:
add_stopwords=nltk.corpus.stopwords.words('english')
add_stopwords_1 = ["nytimes","com","sense","day","common","business","todays","said","food","review","sunday","letters","politics","events","terms","services","years","contributors","companies","listings","applications","tax","trump","president","contributing","make","think","woman","federal","called","system","found","american","sale","headline","arts","times","subscriptions","choices","privacy","take","jobs","books","account","accounts","television","nyc","writers","multimedia","journeys","editorials","photography","automobiles","paper","city","tool","sports","weddings","columnists","contribution","even","nyt","obituary","state","travel","advertise","pm","street","go","corrections","saturday","company","dance","states","real","movies","estate","percent","music","tech","living","science","fashion","please","opinion","art","new","york","time","u","wa","reading","ha","video","image","photo","credit","edition","magazine","oped","could","crossword","mr","term","feedback","index","get","also","b","help","year","health","united","education","week","think","guide","event","two","first","subscription","service","cut","is","nytimescom","section","sections","Sections","Home","home","Search","search","Skip","skip","content","navigation","View","view","mobile","version","Subscribe","subscribe","Now","now","Log","log","In","in","setting","settings","Site","site","Loading","loading","article","next","previous","Advertisement","ad","advertisement","Supported","supported","by","Share","share","Page","page","Continue","continue","main","story","newsletter","Sign","Up","Manage","email","preferences","Not","you","opt","out","contact","us","anytime","thank","subscribing","see","more","email"] 
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered1").setStopWords(add_stopwords)
stopwordsRemover1 = StopWordsRemover(inputCol="filtered1", outputCol="filtered").setStopWords(add_stopwords_1)


In [15]:
label_stringIdx = StringIndexer(inputCol = "category", outputCol = "label")
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=1000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover,stopwordsRemover1, hashingTF, idf, label_stringIdx])

In [16]:
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
dataset.show(5)
(trainingData, testData) = dataset.randomSplit([0.8, 0.2], seed = 100)
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)

+--------------------+--------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
|               value|category|               words|           filtered1|            filtered|         rawFeatures|            features|label|
+--------------------+--------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
|   Sections SEARC...|business|[sections, search...|[sections, search...|[transcript, repu...|(1000,[0,1,2,3,5,...|(1000,[0,1,2,3,5,...|  2.0|
|   Sections SEARC...|business|[sections, search...|[sections, search...|[transcript, demo...|(1000,[0,1,2,3,7,...|(1000,[0,1,2,3,7,...|  2.0|
|             NYTi...|business|[nytimes, com, no...|[nytimes, com, lo...|[longer, supports...|(1000,[0,1,2,3,6,...|(1000,[0,1,2,3,6,...|  2.0|
|   Image Credit P...|business|[image, credit, p...|[image, credit, p...|[prop, styling, p...|(1000,[0,1,2,3,5,...|(1000,[0,1,2,3,5,...|  2.0|

In [17]:
predictions = lrModel.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("value","category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)
predictions.show(10)

+------------------------------+--------+------------------------------+-----+----------+
|                         value|category|                   probability|label|prediction|
+------------------------------+--------+------------------------------+-----+----------+
|            NYTimes.com no ...|politics|[0.9999871534388363,1.29209...|  0.0|       0.0|
|             NYTimes.com no...|politics|[0.9868358293776673,0.00503...|  0.0|       0.0|
|             NYTimes.com no...|politics|[0.9808640473510387,0.01207...|  0.0|       0.0|
|   Sections SEARCH Skip to ...|politics|[0.9790239641585076,0.00353...|  0.0|       0.0|
|   Sections SEARCH Skip to ...|politics|[0.9721610236099645,0.01183...|  0.0|       0.0|
|   Sections SEARCH Skip to ...|politics|[0.9441271568827793,0.00510...|  0.0|       0.0|
|   Sections SEARCH Skip to ...|politics|[0.9441271568827793,0.00510...|  0.0|       0.0|
|   Sections SEARCH Skip to ...|politics|[0.9417085251681173,0.01749...|  0.0|       0.0|
|   Sectio

In [18]:
pipelineFit2 = pipeline.fit(unknown_data)
unknown_dataset = pipelineFit2.transform(unknown_data)

In [19]:
predictions2 = lrModel.transform(unknown_dataset)
predictions2.filter(predictions2['prediction'] == 0) \
    .select("value","category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)
predictions2.show(10)

+------------------------------+--------+------------------------------+-----+----------+
|                         value|category|                   probability|label|prediction|
+------------------------------+--------+------------------------------+-----+----------+
|   Sections SEARCH Skip to ...|politics|[0.9078692900477618,0.02405...|  2.0|       0.0|
|   Sections SEARCH Skip to ...|politics|[0.8756092637916708,0.00908...|  2.0|       0.0|
|   Sections SEARCH Skip to ...|business|[0.8260157131029408,0.03995...|  0.0|       0.0|
|   Sections SEARCH Skip to ...|politics|[0.8182855057512108,0.04782...|  2.0|       0.0|
|   Sections SEARCH Skip to ...|politics|[0.8182855057512108,0.04782...|  2.0|       0.0|
|   Sections SEARCH Skip to ...|politics|[0.7519178488712814,0.02285...|  2.0|       0.0|
|   Sections SEARCH Skip to ...|politics|[0.6568550406330272,0.14343...|  2.0|       0.0|
|   Sections SEARCH Skip to ...|politics|[0.6107036144878749,0.10244...|  2.0|       0.0|
|   Sectio

In [20]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
#evaluator.evaluate(predictions)
print("-------Accuracy of test data using logistic_regression-----: " + str(evaluator.evaluate(predictions)*100)+"%")


from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
#evaluator.evaluate(predictions2)
print("-------Accuracy of unknown data using logistic_regression-----: " + str(evaluator.evaluate(predictions2)*100)+"%")

#training the data -- Naive Bayes
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(smoothing=1)
model = nb.fit(trainingData)


-------Accuracy of test data using logistic_regression-----: 62.007149838196376%
-------Accuracy of unknown data using logistic_regression-----: 11.055148555148556%


In [21]:
predictions3 = model.transform(testData)
predictions3.filter(predictions3['prediction'] == 0) \
    .select("value","category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)
predictions3.show(10)

+------------------------------+--------+------------------------------+-----+----------+
|                         value|category|                   probability|label|prediction|
+------------------------------+--------+------------------------------+-----+----------+
|        President Trump (@P...|politics|[1.0,1.0088217456763839E-22...|  0.0|       0.0|
|   Sections SEARCH Skip to ...|politics|[1.0,2.3853534848808864E-25...|  0.0|       0.0|
|   Sections SEARCH Skip to ...|politics|[1.0,7.761528270890524E-26,...|  0.0|       0.0|
|   Sections SEARCH Skip to ...|politics|[1.0,2.7352875038364617E-26...|  0.0|       0.0|
|   Sections SEARCH Skip to ...|politics|[1.0,4.9390977486455355E-27...|  0.0|       0.0|
|             NYTimes.com no...|politics|[1.0,5.177042629739778E-29,...|  0.0|       0.0|
|   Sections SEARCH Skip to ...|politics|[1.0,1.3327914171670283E-29...|  0.0|       0.0|
|   Sections SEARCH Skip to ...|politics|[1.0,1.3327914171670283E-29...|  0.0|       0.0|
|   Sectio

In [22]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
print("-------Accuracy of test data using naive_bayes-----: " + str(evaluator.evaluate(predictions3)*100)+"%")


-------Accuracy of test data using naive_bayes-----: 64.2304340042251%


In [23]:
predictions4 = model.transform(unknown_dataset)
predictions4.filter(predictions4['prediction'] == 0) \
    .select("value","category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)
predictions4.show(10)

+------------------------------+--------+------------------------------+-----+----------+
|                         value|category|                   probability|label|prediction|
+------------------------------+--------+------------------------------+-----+----------+
|   Sections SEARCH Skip to ...|politics|[1.0,2.0034883732711258E-17...|  2.0|       0.0|
|   Sections SEARCH Skip to ...|business|[1.0,1.3420024395784487E-53...|  0.0|       0.0|
|   Sections SEARCH Skip to ...|politics|[1.0,1.1615753403061956E-53...|  2.0|       0.0|
|   Sections SEARCH Skip to ...|politics|[1.0,5.709716963079426E-68,...|  2.0|       0.0|
|   Sections SEARCH Skip to ...|politics|[1.0,6.264837857255992E-96,...|  2.0|       0.0|
|   Sections SEARCH Skip to ...|politics|[0.9999999999999993,1.01844...|  2.0|       0.0|
|   Sections SEARCH Skip to ...|politics|[0.9999999999999993,1.01844...|  2.0|       0.0|
|   Sections SEARCH Skip to ...|politics|[0.9999999999975344,3.10781...|  2.0|       0.0|
|   Sectio

In [24]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
print("-------Accuracy of unknown data using naive_bayes-----: " + str(evaluator.evaluate(predictions4)*100)+"%")

-------Accuracy of unknown data using naive_bayes-----: 9.173913043478262%
