In [10]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.master("local[*]").appName("spam").getOrCreate()
sc=spark.sparkContext
spark.conf.set('spark.executor.memory', '45G')
spark.conf.set('spark.driver.memory', '80G')
spark.conf.set('spark.driver.maxResultSize', '10G')
from pyspark.ml.feature import CountVectorizer,NGram,IDF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import CrossValidator,ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [11]:
data1=sc.textFile("/user/edureka_1152044/spam_collection.txt").map(lambda x:x.split("\n")).map(lambda x:x[0])
data1.collect()

[u'ham\tGo until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...',
 u'ham\tOk lar... Joking wif u oni...',
 u"spam\tFree entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's",
 u'ham\tU dun say so early hor... U c already then say...',
 u"ham\tNah I don't think he goes to usf, he lives around here though",
 u"spam\tFreeMsg Hey there darling it's been 3 week's now and no word back! I'd like some fun you up for it still? Tb ok! XxX std chgs to send, \xa31.50 to rcv",
 u'ham\tEven my brother is not like to speak with me. They treat me like aids patent.',
 u"ham\tAs per your request 'Melle Melle (Oru Minnaminunginte Nurungu Vettam)' has been set as your callertune for all Callers. Press *9 to copy your friends Callertune",
 u'spam\tWINNER!! As a valued network customer you have been selected to receivea \xa3900 prize reward! To claim call 

In [12]:
stopwords=['ourselves','hers','between','yourself','but','again','there','about','once','during','out','very','having','with','they',
           'own','an','be','some','for','do','its','yours','such','into','of','most','itself','other','off','is','s','am','or','who',
           'as','from','him','each','the','themselves','until','below','are','we','these','your','his','through','don','nor','me','were',
           'her','more','himself','this','down','should','our','their', 'while', 'above', 'both', 'up', 'to', 'ours', 'had', 'she', 'all',
           'no', 'when', 'at', 'any', 'before', 'them', 'same', 'and', 'been', 'have', 'in', 'will', 'on', 'does', 'yourselves', 'then',
           'that', 'because', 'what', 'over', 'why', 'so', 'can', 'did', 'not', 'now', 'under', 'he', 'you', 'herself', 'has', 'just', 
           'where', 'too', 'only', 'myself', 'which', 'those', 'i', 'after', 'few', 'whom', 't', 'being', 'if', 'theirs', 'my', 'against',
           'a', 'by', 'doing', 'it', 'how', 'further', 'was', 'here', 'than','u','n','y','s']

In [13]:
limiter=['.',',',':',';','?',"-","\"","\'","!",")","(","[","]"]

In [14]:
def stop(rdd):
    line=None
    if rdd not in stopwords:
        line=rdd
    return line
def delimiter(rdd):
    for i in limiter:
        rdd=rdd.replace(i,"")
    return rdd
data2=data1.map(lambda x:x.split("\t")).map(lambda x:x[1]).map(lambda x: x.encode('utf-8')).map(delimiter)\
        .map(stop).map(lambda x:x.lower())
data2.collect()

['go until jurong point crazy available only in bugis n great world la e buffet cine there got amore wat',
 'ok lar joking wif u oni',
 'free entry in 2 a wkly comp to win fa cup final tkts 21st may 2005 text fa to 87121 to receive entry questionstd txt ratet&cs apply 08452810075over18s',
 'u dun say so early hor u c already then say',
 'nah i dont think he goes to usf he lives around here though',
 'freemsg hey there darling its been 3 weeks now and no word back id like some fun you up for it still tb ok xxx std chgs to send \xc2\xa3150 to rcv',
 'even my brother is not like to speak with me they treat me like aids patent',
 'as per your request melle melle oru minnaminunginte nurungu vettam has been set as your callertune for all callers press *9 to copy your friends callertune',
 'winner as a valued network customer you have been selected to receivea \xc2\xa3900 prize reward to claim call 09061701461 claim code kl341 valid 12 hours only',
 'had your mobile 11 months or more u r enti

In [15]:
label=data1.map(lambda x:x.split("\t")[0]).collect()

In [16]:
from pyspark.sql.types import *
import pandas as pd
dfpd=pd.DataFrame(data2.collect(),columns=['value'])
dfpd['label']=label
field=[StructField("value",StringType(),True),StructField("label",StringType(),True)]
schema=StructType(field)
df=spark.createDataFrame(dfpd,schema=schema)
df.show(5)

+--------------------+-----+
|               value|label|
+--------------------+-----+
|go until jurong p...|  ham|
|ok lar joking wif...|  ham|
|free entry in 2 a...| spam|
|u dun say so earl...|  ham|
|nah i dont think ...|  ham|
+--------------------+-----+
only showing top 5 rows



In [17]:
from pyspark.sql.functions import col,split,ltrim,regexp_replace
df1=df.withColumn("value",ltrim(col("value"))).withColumn("value",split(col("value")," ")).\
    withColumn("label",regexp_replace(col("label"),"ham","1")).\
    withColumn("label",regexp_replace(col("label"),"spam","0")).\
    withColumn("label",col("label").cast(IntegerType()))
df1.show(5)

+--------------------+-----+
|               value|label|
+--------------------+-----+
|[go, until, juron...|    1|
|[ok, lar, joking,...|    1|
|[free, entry, in,...|    0|
|[u, dun, say, so,...|    1|
|[nah, i, dont, th...|    1|
+--------------------+-----+
only showing top 5 rows



In [18]:
ngram=NGram(n=2,inputCol='value',outputCol='grams')
vector=CountVectorizer(inputCol='grams',outputCol='vector')
idf=IDF(inputCol='vector',outputCol='features')
num=[]
for i in range(0,100):
    num.append(i*0.01)
train,test=df1.randomSplit([0.7,0.3],2)

In [19]:
metrics=MulticlassClassificationEvaluator(labelCol='label',predictionCol='prediction',metricName='accuracy')
lr=LogisticRegression(maxIter=20)
pipeline=Pipeline(stages=[ngram,vector,idf,lr])
param=ParamGridBuilder().addGrid(lr.regParam,[0.01,0.1]).addGrid(ngram.n,[2,3]).build()
cv=CrossValidator(estimator=pipeline,estimatorParamMaps=param,evaluator=metrics,numFolds=4)
model=cv.fit(train)
result=model.transform(test)
result.show(3)

+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|               value|label|               grams|              vector|            features|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|[22, days, to, ki...|    0|[22 days, days to...|(33554,[14,16,104...|(33554,[14,16,104...|[0.52389204033697...|[0.62805740421976...|       0.0|
|[500, new, mobile...|    0|[500 new, new mob...|(33554,[216,418,4...|(33554,[216,418,4...|[4.61286982898126...|[0.99017420507003...|       0.0|
|[8, at, the, late...|    1|[8 at, at the, th...|(33554,[10,17,55,...|(33554,[10,17,55,...|[-5.6192704026718...|[0.00361417694677...|       1.0|
+--------------------+-----+--------------------+--------------------+--------------------+--------------------+------------------

In [20]:
denominator=test.count()
numerator=result.select("label","prediction").where(col("label")==col("prediction")).count()
float(numerator)/denominator

0.9464394400486914