# Text Classification Using Spark

In [1]:
import pyspark as ps
from pyspark.sql import SQLContext
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import desc
from string import punctuation
import re
import numpy as np
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.feature import NGram
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression, NaiveBayes

In [2]:
sc = ps.SparkContext("local", "Simple App")
sqlContext = SQLContext(sc)
spark = SparkSession(sc)
train_path='data.csv'
train_rdd = sc.textFile(train_path)

In [3]:
def tolower(lines):
    lines = lines.lower()
    return lines

In [4]:
def remove_numbers(s):
    s = re.sub('^[0-9]+', '', s)
    return(s)

In [5]:
train_rdd = train_rdd.map(tolower).map(remove_numbers)

In [6]:
def strip_punctuation(s):
    return ''.join(c for c in s if c not in punctuation)

In [7]:
def parseTrain(rdd):
 
    header = rdd.first()
    print(header)
    body = rdd.filter(lambda r: r!=header)
    def parseRow(row):
        print(row)
        for i in
        stopwords = ["like", "just", "will", "&amp","a", "about", "above", "after", "again", "against", "all", "am", "an", "and", "any", "are", "arent't", "as", "at", "be", "because", "been", "before", "being", "below","between","both","but","by","can", "can't", "cannot","could","couldn't","did","didn't","do","does","doesn't","doing","don't","down","during","each","few","for","from","further","had","hadn't","has","hasn't","have","haven't","having","he","he'd","he'll","he's", "hes", "her's" ,"her","here","here's","hers","herself","him","himself","his","how","how's","i","i'd","i'll","i'm","ive","if","in","into","is","isn't","it","it's","its","itself","let's","me","more","most","mustn't","my","myself","no","nor","not","of","off","on","once","only","or","other","ought","our","ours","ourselves","out","over","own","same","shan't","she","she'd","she'll","she's","should","shouldn't","so","some","such","than","that","that's","the","their","theirs","them","themselves","then","there","there's","these","they","they'd","they'll","they're","they've","this","those","through","to","too","under","until","up","very","was","wasn't","we","we'd","we'll","we're","we've","were","weren't","what","what's","when","when's","where","where's","which","while","who","who's","whom","why","why's","with","won't","would","wouldn't","you","you'd","you'll","you're","you've","your","yours","yourself","yourselves"]
        new_rowlist = [0,0]
        new_rowlist[0] = row_list[0]
        new_rowlist[1] = " ".join(row_list[1:])
        	new_rowlist[1] = new_rowlist[1].replace(i, " ")
        new_rowlist[1] = strip_punctuation(str(new_rowlist[1]))
        new_rowlist[1] = re.sub("\s\s+" , " ", str(new_rowlist[1]))
        return row_tuple
 
    rdd_parsed = body.map(parseRow)
 
    colnames = header.split(",")
 
    return rdd_parsed.toDF(colnames)

In [8]:
df = parseTrain(train_rdd)

tar,text


# Using Term Frequency

In [9]:
tokenizer = tokenizer(inputCol="text", outputCol="words")
hashtf = HashingTF(numFeatures=2**10, inputCol="words", outputCol='features')
label_stringIdx = StringIndexer(inputCol = "text", outputCol = "label")
pipeline = Pipeline(stages=[tokenizer, hashtf, label_stringIdx])
pipelineFit = pipeline.fit(df)
train_df = pipelineFit.transform(df)
(train_set, test_set, final_testset) = train_df.randomSplit([0.8, 0.1, 0.1], seed = 1235)

# Logistic Regression

In [10]:
lr = LogisticRegression(maxIter=100)
lrModel = lr.fit(train_set)
predictions = lrModel.transform(train_set)
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
train_logistic = evaluator.evaluate(predictions)
predictions = lrModel.transform(test_set)
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
test_logistic = evaluator.evaluate(predictions)
predictions = lrModel.transform(finalset)
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
finaltest_logistic = evaluator.evaluate(predictions)


# Naive Bayes

In [11]:
nb = NaiveBayes(smoothing=1e-9)
nbModel = nb.fit(train_set)
pred = nbModel.transform(train_set)
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
train_nb = evaluator.evaluate(pred)
pred = nbModel.transform(test_set)
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
test_nb = evaluator.evaluate(pred)
pred = nbModel.transform(final_testset)
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
finaltest_nb = evaluator.evaluate(predictions)


# Random Forests

In [12]:
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees = 150, maxDepth = 7)
rfModel = rf.fit(train_set)
pred = rfModel.transform(train_set)
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
train_rf = evaluator.evaluate(pred)
pred = rfModel.transform(test_set)
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
test_rf = evaluator.evaluate(pred)
pred = rfModel.transform(final_testset)
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
finaltest_rf = evaluator.evaluate(predictions)

# Prediction on new data

In [13]:
predictions = lrModel.transform(final_testset)
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
finaltest_logistic = evaluator.evaluate(predictions)

predictions = model.transform(final_testset)
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
finaltest_nb = evaluator.evaluate(predictions)

predictions = rfModel.transform(final_testset)
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
finaltest_rf = evaluator.evaluate(predictions)

In [15]:
print("\n Train Accuracy:\n")
print("Logistic Regression:\n", train_logistic)
print("Random Forest:\n", train_rf)
print("Naive Bayes:\n", train_nb)
print("\n Test Accuracy:")
print("Logistic Regression Accuracy:\n", test_logistic)
print("Random Forest Accuracy:\n", test_rf)
print("Naive Bayes Accuracy:\n", test_nb)

print("\n Final Accuracy Results:\n")
print("Logistic Regression Acc:\n", finaltest_logistic)
print("Random Forest Accuracy:\n", finaltest_rf)
print("Naive Bayes Accuracy:\n", finaltest_nb)



 Train Accuracy:

Logistic Regression:
 0.6434101018999994
Random Forest:
 0.5271271381803007
Naive Bayes:
 0.4739655196936644

 Test Accuracy:
Logistic Regression Accuracy:
 0.6768135639639883
Random Forest Accuracy:
 0.45711989989833335
Naive Bayes Accuracy:
 0.4547989126898396

 Final Accuracy Results:

Logistic Regression Acc:
 0.67882345545
Random Forest Accuracy:
 0.49066176809333334
Naive Bayes Accuracy:
 0.47848484871
