In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('spam detector').getOrCreate()

In [2]:
data = spark.read.csv('/home/ubuntu/Course_Notes/Spark_for_Machine_Learning/Natural_Language_Processing/smsspamcollection/SMSSpamCollection',inferSchema=True, sep = '\t')

In [3]:
data.show()

+----+--------------------+
| _c0|                 _c1|
+----+--------------------+
| ham|Go until jurong p...|
| ham|Ok lar... Joking ...|
|spam|Free entry in 2 a...|
| ham|U dun say so earl...|
| ham|Nah I don't think...|
|spam|FreeMsg Hey there...|
| ham|Even my brother i...|
| ham|As per your reque...|
|spam|WINNER!! As a val...|
|spam|Had your mobile 1...|
| ham|I'm gonna be home...|
|spam|SIX chances to wi...|
|spam|URGENT! You have ...|
| ham|I've been searchi...|
| ham|I HAVE A DATE ON ...|
|spam|XXXMobileMovieClu...|
| ham|Oh k...i'm watchi...|
| ham|Eh u remember how...|
| ham|Fine if thats th...|
|spam|England v Macedon...|
+----+--------------------+
only showing top 20 rows



In [4]:
data = data.withColumnRenamed('_c0','class').withColumnRenamed('_c1','sms')

In [5]:
data.show(5)

+-----+--------------------+
|class|                 sms|
+-----+--------------------+
|  ham|Go until jurong p...|
|  ham|Ok lar... Joking ...|
| spam|Free entry in 2 a...|
|  ham|U dun say so earl...|
|  ham|Nah I don't think...|
+-----+--------------------+
only showing top 5 rows



In [6]:
from pyspark.sql.functions import length

In [7]:
data = data.withColumn('length',length(data['sms']))

In [8]:
data.show()

+-----+--------------------+------+
|class|                 sms|length|
+-----+--------------------+------+
|  ham|Go until jurong p...|   111|
|  ham|Ok lar... Joking ...|    29|
| spam|Free entry in 2 a...|   155|
|  ham|U dun say so earl...|    49|
|  ham|Nah I don't think...|    61|
| spam|FreeMsg Hey there...|   147|
|  ham|Even my brother i...|    77|
|  ham|As per your reque...|   160|
| spam|WINNER!! As a val...|   157|
| spam|Had your mobile 1...|   154|
|  ham|I'm gonna be home...|   109|
| spam|SIX chances to wi...|   136|
| spam|URGENT! You have ...|   155|
|  ham|I've been searchi...|   196|
|  ham|I HAVE A DATE ON ...|    35|
| spam|XXXMobileMovieClu...|   149|
|  ham|Oh k...i'm watchi...|    26|
|  ham|Eh u remember how...|    81|
|  ham|Fine if thats th...|    56|
| spam|England v Macedon...|   155|
+-----+--------------------+------+
only showing top 20 rows



In [9]:
data.groupBy('class').mean('length').show()

+-----+-----------------+
|class|      avg(length)|
+-----+-----------------+
|  ham|71.45431945307645|
| spam|138.6706827309237|
+-----+-----------------+



In [10]:
from pyspark.ml.feature import CountVectorizer,StopWordsRemover,StringIndexer,IDF,Tokenizer

In [11]:
tokenizer = Tokenizer(inputCol='sms',outputCol='tokened_text')
stopwordsremover = StopWordsRemover(inputCol='tokened_text',outputCol='filtered')
count_vec = CountVectorizer(inputCol='filtered',outputCol='CV_tokens')
idf = IDF(inputCol='CV_tokens',outputCol='tf_idf')
indexer = StringIndexer(inputCol='class',outputCol='label')

In [12]:
from pyspark.ml.feature import VectorAssembler

In [13]:
assembler = VectorAssembler(inputCols=['tf_idf','length'],outputCol= 'features')

In [14]:
from pyspark.ml.classification import NaiveBayes

In [15]:
nb = NaiveBayes()

In [16]:
from pyspark.ml import Pipeline

In [17]:
data_pipe = Pipeline(stages = [indexer, tokenizer,stopwordsremover,count_vec,idf,assembler])

In [18]:
final_df = data_pipe.fit(data).transform(data).select(['features','label'])

In [19]:
final_df.show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(13459,[8,12,33,6...|  0.0|
|(13459,[0,26,308,...|  0.0|
|(13459,[2,14,20,3...|  1.0|
|(13459,[0,73,84,1...|  0.0|
|(13459,[36,39,140...|  0.0|
|(13459,[11,57,62,...|  1.0|
|(13459,[11,55,108...|  0.0|
|(13459,[133,195,4...|  0.0|
|(13459,[1,50,124,...|  1.0|
|(13459,[0,1,14,29...|  1.0|
|(13459,[5,19,36,4...|  0.0|
|(13459,[9,18,40,9...|  1.0|
|(13459,[14,32,50,...|  1.0|
|(13459,[42,99,101...|  0.0|
|(13459,[567,1745,...|  0.0|
|(13459,[32,113,11...|  1.0|
|(13459,[86,224,47...|  0.0|
|(13459,[0,2,52,13...|  0.0|
|(13459,[0,77,107,...|  0.0|
|(13459,[4,32,35,6...|  1.0|
+--------------------+-----+
only showing top 20 rows



In [20]:
train,test = final_df.randomSplit([0.7,0.3])

In [21]:
train.show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(13459,[0,1,2,4,4...|  1.0|
|(13459,[0,1,2,6,5...|  1.0|
|(13459,[0,1,2,8,9...|  0.0|
|(13459,[0,1,2,13,...|  1.0|
|(13459,[0,1,2,13,...|  1.0|
|(13459,[0,1,2,13,...|  1.0|
|(13459,[0,1,2,14,...|  0.0|
|(13459,[0,1,2,16,...|  1.0|
|(13459,[0,1,2,21,...|  1.0|
|(13459,[0,1,2,21,...|  1.0|
|(13459,[0,1,2,21,...|  1.0|
|(13459,[0,1,2,44,...|  0.0|
|(13459,[0,1,2,50,...|  1.0|
|(13459,[0,1,3,10,...|  0.0|
|(13459,[0,1,4,53,...|  0.0|
|(13459,[0,1,5,15,...|  0.0|
|(13459,[0,1,6,21,...|  0.0|
|(13459,[0,1,8,9,1...|  0.0|
|(13459,[0,1,8,9,1...|  0.0|
|(13459,[0,1,8,16,...|  0.0|
+--------------------+-----+
only showing top 20 rows



In [22]:
spam_detector = nb.fit(train)

In [23]:
results = spam_detector.transform(test)

In [24]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [25]:
evaluator = MulticlassClassificationEvaluator(metricName='accuracy',labelCol='label')

In [26]:
results_metric = evaluator.evaluate(results)

In [32]:
results_metric

0.9161640530759951

In [33]:
from pyspark.ml.classification import GBTClassifier,RandomForestClassifier

In [36]:
gbt = GBTClassifier(maxDepth=2,featuresCol='features',labelCol='label')
rfc = RandomForestClassifier(numTrees=100,featuresCol='features',labelCol='label')

Exception ignored in: <bound method JavaParams.__del__ of GBTClassifier_492f830af75ffd71c9a9>
Traceback (most recent call last):
  File "/home/ubuntu/spark-2.1.1-bin-hadoop2.7/python/pyspark/ml/wrapper.py", line 76, in __del__
    SparkContext._active_spark_context._gateway.detach(self._java_obj)
  File "/home/ubuntu/spark-2.1.1-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1870, in detach
AttributeError: 'NoneType' object has no attribute '_detach'
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:37094)
Traceback (most recent call last):
  File "/home/ubuntu/spark-2.1.1-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 827, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/ubuntu/spark-2.1.1-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zi

Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:37094)

In [31]:
fit1 = gbt.fit(train)
fit2 = rfc.fit(train)

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:37094)
Traceback (most recent call last):
  File "/home/ubuntu/spark-2.1.1-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 827, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/ubuntu/spark-2.1.1-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 963, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused


Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:37094)

In [None]:
results1 = fit1.transform(test)
results2 = fit2.transform(test)

In [None]:
evaluator.evaluate(results1)

In [None]:
evaluator.evaluate(results2)