In [1]:
import os

In [2]:
sc

In [3]:
###Create Dataframe###
from pyspark.sql import SQLContext, Row
#load the historical file and convert lines to rows
lines = sc.textFile("file:///Users/Administrator/Desktop/spark/out3.txt")
words = lines.map(lambda l: l.split("\t"))
header = words.first()
words = words.filter(lambda x:x != header)
data= sqlContext.createDataFrame(words, ["Title", "Description", "Topic","nT"])
data.show(5)
#omit column "nT" from dataframe
data = data.select(["Title","Description","Topic"])
data.show(5)
data.printSchema()

+--------------------+--------------------+---------------+---+
|               Title|         Description|          Topic| nT|
+--------------------+--------------------+---------------+---+
|"Meek Mill Wishes...|"'I wasn't feelin...|"entertainment"|"1"|
|"A Victim Of Reve...| "Don't leak nudes."|"entertainment"|"1"|
|"Make A Pitcher O...|   "It's wine time."|"entertainment"|"1"|
|"Create A Summer ...|"Which tune will ...|"entertainment"|"1"|
|"Everyone Is A Co...|"Are you a combo ...|"entertainment"|"1"|
+--------------------+--------------------+---------------+---+
only showing top 5 rows

+--------------------+--------------------+---------------+
|               Title|         Description|          Topic|
+--------------------+--------------------+---------------+
|"Meek Mill Wishes...|"'I wasn't feelin...|"entertainment"|
|"A Victim Of Reve...| "Don't leak nudes."|"entertainment"|
|"Make A Pitcher O...|   "It's wine time."|"entertainment"|
|"Create A Summer ...|"Which tune will 

In [4]:
#look at count for topics
from pyspark.sql.functions import col
data.groupBy("Topic") \
    .count() \
    .orderBy(col("count").desc()) \
    .show()

+---------------+-----+
|          Topic|count|
+---------------+-----+
|       "sports"|  885|
|   "technology"|  743|
|     "business"|  679|
|      "science"|  674|
|"entertainment"|  647|
|       "health"|  380|
+---------------+-----+



In [12]:
###Model Pipeline###
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression
# regular expression tokenizer
regexTokenizer1 = RegexTokenizer(inputCol="Title", outputCol="words1", pattern="\\W")
regexTokenizer2 = RegexTokenizer(inputCol="Description", outputCol="words2", pattern="\\W")

In [13]:
# stop words
add_stopwords =StopWordsRemover.loadDefaultStopWords("english")
stopwordsRemover1 = StopWordsRemover(inputCol="words1", outputCol="filtered1").setStopWords(add_stopwords)
stopwordsRemover2 = StopWordsRemover(inputCol="words2", outputCol="filtered2").setStopWords(add_stopwords)

In [14]:
# bag of words count
countVectors1 = CountVectorizer(inputCol="filtered1", outputCol="features1", vocabSize=10000, minDF=5)
countVectors2 = CountVectorizer(inputCol="filtered2", outputCol="features2", vocabSize=10000, minDF=5)

In [17]:
#String Indexer 
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
label_stringIdx = StringIndexer(inputCol = "Topic", outputCol = "label")
pipeline = Pipeline(stages=[regexTokenizer1,regexTokenizer2,stopwordsRemover1,stopwordsRemover2,countVectors1,countVectors2,label_stringIdx])

In [18]:
# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
dataset.show(5)

+--------------------+--------------------+---------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
|               Title|         Description|          Topic|              words1|              words2|           filtered1|           filtered2|           features1|           features2|label|
+--------------------+--------------------+---------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
|"Meek Mill Wishes...|"'I wasn't feelin...|"entertainment"|[meek, mill, wish...|[i, wasn, t, feel...|[meek, mill, wish...|     [wasn, feeling]|  (1678,[870],[1.0])|(3064,[952,2758],...|  4.0|
|"A Victim Of Reve...| "Don't leak nudes."|"entertainment"|[a, victim, of, r...|[don, t, leak, nu...|[victim, revenge,...|       [leak, nudes]|(1678,[169,1109,1...|        (3064,[],[])|  4.0|
|"Make A Pitcher O...|   "It's wine time

In [20]:
#remove string variables
dataset=dataset.select(["features1","features2","label"])
dataset.show(5)

+--------------------+--------------------+-----+
|           features1|           features2|label|
+--------------------+--------------------+-----+
|  (1678,[870],[1.0])|(3064,[952,2758],...|  4.0|
|(1678,[169,1109,1...|        (3064,[],[])|  4.0|
|(1678,[30,32,83,1...|   (3064,[10],[1.0])|  4.0|
|(1678,[83,122,502...|        (3064,[],[])|  4.0|
|(1678,[0,17,174,2...|        (3064,[],[])|  4.0|
+--------------------+--------------------+-----+
only showing top 5 rows



In [21]:
##create vector out of new features##
assembler = VectorAssembler(
    inputCols=["features1", "features2"],
    outputCol="features")
output = assembler.transform(dataset2)
output.show(5)
dataset=output.select(['features','label'])
dataset.show(5)

+--------------------+--------------------+-----+--------------------+
|           features1|           features2|label|            features|
+--------------------+--------------------+-----+--------------------+
|  (1678,[870],[1.0])|(3064,[952,2758],...|  4.0|(4742,[870,2630,4...|
|(1678,[169,1109,1...|        (3064,[],[])|  4.0|(4742,[169,1109,1...|
|(1678,[30,32,83,1...|   (3064,[10],[1.0])|  4.0|(4742,[30,32,83,1...|
|(1678,[83,122,502...|        (3064,[],[])|  4.0|(4742,[83,122,502...|
|(1678,[0,17,174,2...|        (3064,[],[])|  4.0|(4742,[0,17,174,2...|
+--------------------+--------------------+-----+--------------------+
only showing top 5 rows

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(4742,[870,2630,4...|  4.0|
|(4742,[169,1109,1...|  4.0|
|(4742,[30,32,83,1...|  4.0|
|(4742,[83,122,502...|  4.0|
|(4742,[0,17,174,2...|  4.0|
+--------------------+-----+
only showing top 5 rows



In [22]:
###Split Data to Training and Test###
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

Training Dataset Count: 2807
Test Dataset Count: 1201


In [26]:
###Predictive Modeling###
#Naive Bayes
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(smoothing=1)
model = nb.fit(trainingData)
predictions = model.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("features","label","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30) 
#Accuracy
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

+------------------------------+-----+------------------------------+-----+----------+
|                      features|label|                   probability|label|prediction|
+------------------------------+-----+------------------------------+-----+----------+
|(4742,[9,15,155,260,1387,14...|  0.0|[1.0,5.048048400258517E-17,...|  0.0|       0.0|
|(4742,[29,71,85,118,137,207...|  0.0|[1.0,3.278664053519545E-17,...|  0.0|       0.0|
|(4742,[9,18,49,100,157,1686...|  0.0|[1.0,3.8356342639503773E-19...|  0.0|       0.0|
|(4742,[8,9,15,155,260,734,1...|  0.0|[1.0,1.8045165068184668E-19...|  0.0|       0.0|
|(4742,[8,9,15,155,260,734,1...|  0.0|[1.0,1.8045165068184668E-19...|  0.0|       0.0|
|(4742,[60,65,76,196,292,168...|  0.0|[1.0,6.110681048693149E-20,...|  0.0|       0.0|
|(4742,[148,390,608,1605,167...|  0.0|[1.0,5.21087835933215E-20,6...|  0.0|       0.0|
|(4742,[13,52,62,68,79,89,27...|  0.0|[1.0,4.7932548105260656E-20...|  0.0|       0.0|
|(4742,[12,29,68,120,251,463...|  0.0|[1.0,

0.872029934085848

In [27]:
#Logistic Regression
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)
predictions = lrModel.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("features","label","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

+------------------------------+-----+------------------------------+-----+----------+
|                      features|label|                   probability|label|prediction|
+------------------------------+-----+------------------------------+-----+----------+
|(4742,[13,29,60,62,63,85,20...|  0.0|[0.9986138714869124,4.49256...|  0.0|       0.0|
|(4742,[13,29,60,62,63,85,20...|  0.0|[0.9986138714869124,4.49256...|  0.0|       0.0|
|(4742,[23,48,86,147,313,323...|  0.0|[0.99828719852413,7.1850651...|  0.0|       0.0|
|(4742,[23,48,86,147,313,323...|  0.0|[0.99828719852413,7.1850651...|  0.0|       0.0|
|(4742,[29,71,85,118,137,207...|  0.0|[0.9933366316341055,0.00225...|  0.0|       0.0|
|(4742,[9,29,35,65,81,85,226...|  0.0|[0.9929585291224485,0.00206...|  0.0|       0.0|
|(4742,[9,11,220,482,666,157...|  0.0|[0.9903787711966406,0.00225...|  0.0|       0.0|
|(4742,[9,11,220,482,666,157...|  0.0|[0.9903787711966406,0.00225...|  0.0|       0.0|
|(4742,[71,535,563,862,1004,...|  0.0|[0.99

0.8999653112236674