In [30]:
# Importing Library and setting environment path
import os
import sys

# set the path 

sparkPath = "C:/Users/mishr/Downloads/spark-2.0.2-bin-hadoop2.7/spark-2.0.2-bin-hadoop2.7/spark-2.0.2-bin-hadoop2.7"
os.environ['SPARK_HOME'] = sparkPath
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-csv_2.10:1.4.0 pyspark-shell'
sys.path.append(sparkPath + '/bin')
sys.path.append(sparkPath + '/python')
sys.path.append(sparkPath + '/python/pyspark')
sys.path.append(sparkPath + '/python/pyspark/lib')
sys.path.append(sparkPath + '/python/pyspark/lib/pyspark.zip')
sys.path.append(sparkPath + '/python/pyspark/lib/py4j-0.10.3-src.zip')
sys.path.append("C:/Program Files (x86)/Java/jre1.8.0_111/bin")

In [3]:
import pyspark

In [4]:
from pyspark import SparkContext, Row

In [5]:
sc = SparkContext()

In [6]:
#Reading the data from newCorpora.csv which is a tab separated file and reading it to originalRDD
originalRDD=sc.textFile("file:///E:/Ananya/MSBA_Carlson/Fall/Big Data/Project/newsCorpora.csv").map(lambda line: line.split("\t"))

In [7]:
#Checking first row of the originalRDD to have a view of its structure
originalRDD.first()

['1',
 'Fed official says weak data caused by weather, should not slow taper',
 'http://www.latimes.com/business/money/la-fi-mo-federal-reserve-plosser-stimulus-economy-20140310,0,1312750.story\\?track=rss',
 'Los Angeles Times',
 'b',
 'ddUyU0VZz0BRneMioxUPQVP6sIxvM',
 'www.latimes.com',
 '1394470370698']

In [8]:
#We select the date and title column from the originalRDD and read it to a new RDD called rowRDD
rowRDD = originalRDD.map(lambda p: Row(title=p[1], label=p[4]))

In [9]:
#Checking first row of the rowRDD to have a view of its structure
rowRDD.first()

Row(label='b', title='Fed official says weak data caused by weather, should not slow taper')

In [10]:
#We then filter out the news based on the various categories-business,entertainment,technology and medicine
business = rowRDD.filter(lambda line: line[0] == 'b')
entertainment = rowRDD.filter(lambda line: line[0] == 'e')
technology = rowRDD.filter(lambda line: line[0] == 't')
medicine = rowRDD.filter(lambda line: line[0] == 'm')

In [11]:
#For the purpose of classification we convert the category column to ordinal data format
businessLabel = business.map(lambda line: (0, line[1]))
entertainmentLabel = entertainment.map(lambda line: (1, line[1]))
technologyLabel = technology.map(lambda line: (3, line[1]))
medicineLabel = medicine.map(lambda line: (4, line[1]))

In [12]:
#We then combined all the different categories into a single RDD to analyse them further
newsAllLabel = sc.union([businessLabel, entertainmentLabel, technologyLabel, medicineLabel])
newsAllLabel.first()

(0, 'Fed official says weak data caused by weather, should not slow taper')

In [13]:
#We set the second column as title and the category column as label
newsAllLabelROW = newsAllLabel.map(lambda p: Row(title=p[1], label=p[0]))

In [14]:
#Importing SQLContext
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [15]:
#Converting newsAllLabelRow into dataframe and then registering it as a temporary table
news = sqlContext.createDataFrame(newsAllLabelROW)
news.registerTempTable("news")

In [47]:
#Setting the labels for the dataframe
news = news.selectExpr("cast(label as double) as label", "title")

In [48]:
#Viewing the schema structure of the news table
news.printSchema()

root
 |-- label: double (nullable = true)
 |-- title: string (nullable = true)



In [49]:
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml import Pipeline

In [50]:
# Step 1:Tokenize the review. 
tokenizer = Tokenizer(inputCol="title", outputCol="review_words")
reviewDF = tokenizer.transform(news)
reviewDF.show()

+-----+--------------------+--------------------+
|label|               title|        review_words|
+-----+--------------------+--------------------+
|  0.0|Fed official says...|[fed, official, s...|
|  0.0|Fed's Charles Plo...|[fed's, charles, ...|
|  0.0|US open: Stocks f...|[us, open:, stock...|
|  0.0|Fed risks falling...|[fed, risks, fall...|
|  0.0|Fed's Plosser: Na...|[fed's, plosser:,...|
|  0.0|Plosser: Fed May ...|[plosser:, fed, m...|
|  0.0|Fed's Plosser: Ta...|[fed's, plosser:,...|
|  0.0|Fed's Plosser exp...|[fed's, plosser, ...|
|  0.0|US jobs growth la...|[us, jobs, growth...|
|  0.0|ECB unlikely to e...|[ecb, unlikely, t...|
|  0.0|ECB unlikely to e...|[ecb, unlikely, t...|
|  0.0|EU's half-baked b...|[eu's, half-baked...|
|  0.0|Europe reaches cr...|[europe, reaches,...|
|  0.0|ECB FOCUS-Stronge...|[ecb, focus-stron...|
|  0.0|EU aims for deal ...|[eu, aims, for, d...|
|  0.0|Forex - Pound dro...|[forex, -, pound,...|
|  0.0|Noyer Says Strong...|[noyer, says, str...|


In [51]:
# Step 2:Remove stop words
remover = StopWordsRemover(inputCol="review_words", outputCol="filtered")
filteredDF = remover.transform(reviewDF)
filteredDF.show()

+-----+--------------------+--------------------+--------------------+
|label|               title|        review_words|            filtered|
+-----+--------------------+--------------------+--------------------+
|  0.0|Fed official says...|[fed, official, s...|[fed, official, s...|
|  0.0|Fed's Charles Plo...|[fed's, charles, ...|[fed's, charles, ...|
|  0.0|US open: Stocks f...|[us, open:, stock...|[us, open:, stock...|
|  0.0|Fed risks falling...|[fed, risks, fall...|[fed, risks, fall...|
|  0.0|Fed's Plosser: Na...|[fed's, plosser:,...|[fed's, plosser:,...|
|  0.0|Plosser: Fed May ...|[plosser:, fed, m...|[plosser:, fed, m...|
|  0.0|Fed's Plosser: Ta...|[fed's, plosser:,...|[fed's, plosser:,...|
|  0.0|Fed's Plosser exp...|[fed's, plosser, ...|[fed's, plosser, ...|
|  0.0|US jobs growth la...|[us, jobs, growth...|[us, jobs, growth...|
|  0.0|ECB unlikely to e...|[ecb, unlikely, t...|[ecb, unlikely, e...|
|  0.0|ECB unlikely to e...|[ecb, unlikely, t...|[ecb, unlikely, e...|
|  0.0

In [52]:
#Step 3:Convert to TF words vector
hashingTF = HashingTF(inputCol="filtered", outputCol="TF")
TFDF = hashingTF.transform(filteredDF)
TFDF.show()
## HashingTF in SparkML cannot normalize term frequency with the total number of words in each document
for features_label in TFDF.select("TF", "label").take(3):
    print(features_label)

# Convert to IDF words vector, ensure to name the features as 'features'
idf = IDF(inputCol="TF", outputCol="features")
idfModel = idf.fit(TFDF)
finalDF = idfModel.transform(TFDF)
finalDF.show()

+-----+--------------------+--------------------+--------------------+--------------------+
|label|               title|        review_words|            filtered|                  TF|
+-----+--------------------+--------------------+--------------------+--------------------+
|  0.0|Fed official says...|[fed, official, s...|[fed, official, s...|(262144,[4900,769...|
|  0.0|Fed's Charles Plo...|[fed's, charles, ...|[fed's, charles, ...|(262144,[1889,306...|
|  0.0|US open: Stocks f...|[us, open:, stock...|[us, open:, stock...|(262144,[7695,119...|
|  0.0|Fed risks falling...|[fed, risks, fall...|[fed, risks, fall...|(262144,[7695,583...|
|  0.0|Fed's Plosser: Na...|[fed's, plosser:,...|[fed's, plosser:,...|(262144,[30249,66...|
|  0.0|Plosser: Fed May ...|[plosser:, fed, m...|[plosser:, fed, m...|(262144,[1889,769...|
|  0.0|Fed's Plosser: Ta...|[fed's, plosser:,...|[fed's, plosser:,...|(262144,[1889,277...|
|  0.0|Fed's Plosser exp...|[fed's, plosser, ...|[fed's, plosser, ...|(262144,[2

In [53]:
finalDF.select('label').distinct().collect()

[Row(label=0.0), Row(label=1.0), Row(label=4.0), Row(label=3.0)]

In [54]:
for features_label in finalDF.select("features", "label").take(3):
    print(features_label)   

Row(features=SparseVector(262144, {4900: 6.5481, 7695: 5.3318, 27707: 6.8659, 44133: 6.1187, 70520: 8.966, 97162: 6.8259, 151894: 9.0632, 160735: 4.7292, 161826: 3.8551}), label=0.0)
Row(features=SparseVector(262144, {1889: 7.4098, 3067: 7.589, 81535: 9.521, 92646: 5.0543, 106453: 8.7209, 108453: 5.9721, 136020: 5.0913, 175637: 7.1143, 235090: 9.5877}), label=0.0)
Row(features=SparseVector(262144, {7695: 5.3318, 11910: 4.5874, 21872: 3.2963, 37521: 5.3421, 44133: 6.1187, 58267: 6.8682, 141063: 5.8936, 235090: 9.5877, 255396: 8.6923}), label=0.0)


In [55]:
#Using ML libraries to classify data 
#Algorithm used:Naive Bayes
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [56]:
# Split data into training and testing set 
(training, test) = finalDF.randomSplit([0.7, 0.3])

# Create a Naive Bayes instance
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")

# Use a pipeline to chain all transformers and estimators
pipeline = Pipeline(stages=[nb])

# We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
# This will allow us to jointly choose parameters for all Pipeline stages.
# A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
# We use a ParamGridBuilder to construct a grid of parameters to search over.

paramGrid = ParamGridBuilder().addGrid(nb.smoothing, [0.0, 0.2, 0.4, 0.6, 0.8, 1.0]).build()

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(metricName="weightedPrecision"),
                          numFolds=3) 

In [57]:
training.first()

Row(label=0.0, title='"Candy Crush Saga" Maker Prices IPO', review_words=['"candy', 'crush', 'saga"', 'maker', 'prices', 'ipo'], filtered=['"candy', 'crush', 'saga"', 'maker', 'prices', 'ipo'], TF=SparseVector(262144, {19749: 1.0, 84079: 1.0, 87758: 1.0, 100824: 1.0, 137422: 1.0, 183237: 1.0}), features=SparseVector(262144, {19749: 10.4701, 84079: 11.5687, 87758: 5.9075, 100824: 5.5936, 137422: 6.7626, 183237: 4.6816}))

In [None]:
# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(training)

# Make predictions on test documents. cvModel uses the best model found (lrModel).
prediction = cvModel.transform(test)
selected = prediction.select("title", "label", "probability", "prediction").take(5)
for row in selected:
    print(row)

# Evaluate result with accuracy and precision
evaluator = MulticlassClassificationEvaluator(metricName="weightedPrecision")

weightedPrecision = evaluator.evaluate(prediction)
weightedPrecision

evaluator_accuracy=MulticlassClassificationEvaluator(metricName="accuracy")

accuracy = evaluator_accuracy.evaluate(prediction)
accuracy