# Classification of text data using Apache Spark and Python

## Reading Article data into a dataframe and adding a column with corresponding label to it.

In [126]:
import os
import pandas as pd 
import re

In [127]:
def load_files(folder, label):
    files = []
    for filename in os.listdir(folder):
        crs = open(folder+"/"+filename,'r', encoding="utf8").read()
        new_str = re.sub('[^a-zA-Z0-9 ]', '', crs)
        #print(filename)\
        lists.append([classes[label],new_str])
    return files

folders = [
    '../data/business',
    '../data/music',
    '../data/politics',
    '../data/sports',
]

classes = [
    'business',
    'music',
    'politics',
    'sports',
]

cols = ['my_label', 'my_data']
lists = []
label = 0

for folder in folders:
    files = load_files(folder,label)
    label = label+1
   

df1 = pd.DataFrame(lists, columns=cols)

df1

df1.to_csv("dataFrame.csv", encoding='utf-8', index=False)

## Importing all the libraries required for preprocessing and classification

In [128]:
import time
import pyspark
import os
import csv
import sys
import re
from numpy import array
from pyspark.mllib.regression import LabeledPoint
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import NaiveBayes

In [129]:
# to stop an already existing context if any.
# don't run if u haven't initiated context already
sc.stop()

In [130]:
# making a context.
sc =SparkContext()
sqlContext = SQLContext(sc)
data = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('dataFrame.csv')

In [131]:
data.printSchema()

root
 |-- my_label: string (nullable = true)
 |-- my_data: string (nullable = true)



In [132]:
# REGEX Tokenizer
rt = RegexTokenizer(inputCol="my_data", outputCol="words", pattern="\\W")
# Removal of stop words
astps = ["http","https","amp","rt","t","c","the","ssh","httprequest"] 
sr = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(astps)
# Word Count
cv = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)
# tf idf
hTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=10000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms

In [133]:
# Creating the pipe line
label_stringIdx = StringIndexer(inputCol = "my_label", outputCol = "label")
pipeline = Pipeline(stages=[rt, sr, hTF, idf, label_stringIdx])
pFit = pipeline.fit(data)
data1 = pFit.transform(data)
data1.show(5)

+--------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
|my_label|             my_data|               words|            filtered|         rawFeatures|            features|label|
+--------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
|business|IT takes both a s...|[it, takes, both,...|[it, takes, both,...|(10000,[1,5,7,14,...|(10000,[1,5,7,14,...|  0.0|
|business| IF you ask for i...|[if, you, ask, fo...|[if, you, ask, fo...|(10000,[0,1,7,15,...|(10000,[0,1,7,15,...|  0.0|
|business|NOT everyone is g...|[not, everyone, i...|[not, everyone, i...|(10000,[2,8,18,47...|(10000,[2,8,18,47...|  0.0|
|business|You know you need...|[you, know, you, ...|[you, know, you, ...|(10000,[1,2,15,21...|(10000,[1,2,15,21...|  0.0|
|business|PRO bono isnt jus...|[pro, bono, isnt,...|[pro, bono, isnt,...|(10000,[11,20,21,...|(10000,[11,20,21,...|  0.0|
+--------+--------------

# Classification of Test data

## Classification 1: Logistic Regression 

In [134]:
# splitting the training and test data into two sets
# ratios are 80% to 20%
(training_Data, test_Data) = data1.randomSplit([0.8, 0.2], seed = 100)
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(training_Data)
predictions = lrModel.transform(test_Data)

In [135]:
# using MulticlassClassificationEvalutator to evalutate the accuracy of preditcions 
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
print("Accuracy in (%) is: ")
evaluator.evaluate(predictions)*100

Accuracy in (%) is: 


90.91348265261308

## Classification 2: Naive Bayesian

In [136]:
# Predicting using NaiveBayesian classification
nb = NaiveBayes(smoothing=1)
model = nb.fit(training_Data)
predictions = model.transform(test_Data)

In [137]:
# evaluating the accuracy of NaiveBayesian classifier on test data
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
print("Accuracy in (%) is: ")
evaluator.evaluate(predictions)*100

Accuracy in (%) is: 


93.12529137529137

# Classification of Unknown data

## Reading the unknown data and transforming it to fit our model

In [138]:
def load_files2(folder, label1):
    files = []
    for filename in os.listdir(folder):
        crs1 = open(folder+"/"+filename,'r', encoding="utf8").read()
        new_str1 = re.sub('[^a-zA-Z0-9 ]', ' ', crs1)
        #print(filename)\
        lists1.append([classes1[label1],new_str1])
    return files

folders1 = [
    '../unknown/business',
    '../unknown/music',
    '../unknown/politics',
    '../unknown/sports',
]

classes1 = [
    'business',
    'music',
    'politics',
    'sports',
]

cols = ['my_label', 'my_data']
lists1 = []
label1 = 0

for folder in folders1:
    files = load_files2(folder,label1)
    label1 = label1+1
   

df2 = pd.DataFrame(lists1, columns=cols)

df2

df2.to_csv("unknown.csv", encoding='utf-8', index=False)

In [139]:
unknownData = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('unknown.csv')


In [140]:
unknownData1 = pFit.transform(unknownData)

## Classification using Linear Regression for Unknown data

In [141]:
predictions1 = lrModel.transform(unknownData1)
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
print("Accuracy in (%) is: ")
evaluator.evaluate(predictions1)*100

Accuracy in (%) is: 


90.57784320942216

## Classification using Naive Bayesian classifier for Unknown data

In [142]:
predictions2 = model.transform(unknownData1)
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
print("Accuracy in (%) is: ")
evaluator.evaluate(predictions2)*100

Accuracy in (%) is: 


96.95781342840166

# References 

## http://spark.apache.org/docs/2.2.0/api/python/_modules/pyspark/ml/

## http://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html

## https://spark.apache.org/docs/2.1.0/ml-classification-regression.html