In [40]:

# Import the PySpark module
from pyspark.sql import SparkSession

# Create SparkSession object
spark = SparkSession.builder \
                    .master('local[*]') \
                    .appName('test') \
                    .getOrCreate()





In [41]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Specify column names and types
schema = StructType([
    StructField("id", IntegerType()),
    StructField("text", StringType()),
    StructField("label", IntegerType())
])

# Load data from a delimited file
sms = spark.read.csv("/Users/Rafael/Desktop/data/sms.csv", sep=';', header=False, schema=schema)

# Print schema of DataFrame
sms.printSchema()

root
 |-- id: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- label: integer (nullable = true)



In [42]:
sms.show(5)

+---+--------------------+-----+
| id|                text|label|
+---+--------------------+-----+
|  1|Sorry, I'll call ...|    0|
|  2|Dont worry. I gue...|    0|
|  3|Call FREEPHONE 08...|    1|
|  4|Win a 1000 cash p...|    1|
|  5|Go until jurong p...|    0|
+---+--------------------+-----+
only showing top 5 rows



In [43]:
print("The data contain %d records." % sms.count())

The data contain 5574 records.


# Data Preparation

In [44]:
## Import the function to replace regular expressions and the feature to tokenize.
from pyspark.sql.functions import regexp_replace # ================> mini language for pattern matching to remove punctuation symbols
from pyspark.ml.feature import Tokenizer

In [45]:
wrangled = sms.withColumn('text', regexp_replace(sms.text, '[_():;,.!?\\-]', ' '))

wrangled = wrangled.withColumn('text', regexp_replace(wrangled.text, '[0-9]', ' '))

In [46]:
## Merge multiple spaces
wrangled = wrangled.withColumn('text', regexp_replace(wrangled.text, ' +', ' '))

In [47]:
## Split the text column into tokens. 

wrangled = Tokenizer(inputCol='text', outputCol='words').transform(wrangled)

wrangled.show(4, truncate=False)

+---+----------------------------------+-----+------------------------------------------+
|id |text                              |label|words                                     |
+---+----------------------------------+-----+------------------------------------------+
|1  |Sorry I'll call later in meeting  |0    |[sorry, i'll, call, later, in, meeting]   |
|2  |Dont worry I guess he's busy      |0    |[dont, worry, i, guess, he's, busy]       |
|3  |Call FREEPHONE now                |1    |[call, freephone, now]                    |
|4  |Win a cash prize or a prize worth |1    |[win, a, cash, prize, or, a, prize, worth]|
+---+----------------------------------+-----+------------------------------------------+
only showing top 4 rows



In [51]:
### We cleaned up the handling of spaces in the data so that the tokenized text is neater.

### Import the StopWordsRemover, HashingTF and IDF classes.
from pyspark.ml.feature import StopWordsRemover, HashingTF, IDF

### Create a StopWordsRemover object and remove stop words.
wrangled = StopWordsRemover(inputCol='words', outputCol='terms')\
      .transform(wrangled)

In [53]:
wrangled.show(1, truncate=False)

+---+--------------------------------+-----+---------------------------------------+-----------------------------+
|id |text                            |label|words                                  |terms                        |
+---+--------------------------------+-----+---------------------------------------+-----------------------------+
|1  |Sorry I'll call later in meeting|0    |[sorry, i'll, call, later, in, meeting]|[sorry, call, later, meeting]|
+---+--------------------------------+-----+---------------------------------------+-----------------------------+
only showing top 1 row



In [55]:
#### The hashing trick provides a fast and space-efficient way to map a very large (possibly infinite) set of items (in this case, all words contained in the SMS messages) onto a smaller, finite number of values.
####  Apply the hashing trick, converting the results into a TF-IDF.
wrangled = HashingTF(inputCol='terms', outputCol='hash', numFeatures=1024)\
      .transform(wrangled)

In [56]:
wrangled.show(1, truncate=False)

+---+--------------------------------+-----+---------------------------------------+-----------------------------+-------------------------------------------+
|id |text                            |label|words                                  |terms                        |hash                                       |
+---+--------------------------------+-----+---------------------------------------+-----------------------------+-------------------------------------------+
|1  |Sorry I'll call later in meeting|0    |[sorry, i'll, call, later, in, meeting]|[sorry, call, later, meeting]|(1024,[138,344,378,1006],[1.0,1.0,1.0,1.0])|
+---+--------------------------------+-----+---------------------------------------+-----------------------------+-------------------------------------------+
only showing top 1 row



In [57]:
#### The TF-IDF matrix reflects how important a word is to each document. 
#### It takes into account both the frequency of the word within each document but also the frequency of the word across all of the documents in the collection.
#### Convert hashed symbols to TF-IDF
tf_idf = IDF(inputCol='hash', outputCol='features')\
      .fit(wrangled).transform(wrangled)

In [61]:
tf_idf.select('terms', 'features').show(3, truncate=False)

+-----------------------------+----------------------------------------------------------------------------------------------------+
|terms                        |features                                                                                            |
+-----------------------------+----------------------------------------------------------------------------------------------------+
|[sorry, call, later, meeting]|(1024,[138,344,378,1006],[2.2391682769656747,2.892706319430574,3.684405173719015,4.244020961654438])|
|[dont, worry, guess, busy]   |(1024,[53,233,329,858],[4.618714411095849,3.557143394108088,4.618714411095849,4.937168142214383])   |
|[call, freephone]            |(1024,[138,396],[2.2391682769656747,3.3843005812686773])                                            |
+-----------------------------+----------------------------------------------------------------------------------------------------+
only showing top 3 rows



#  Build the Logistic Regression model

In [63]:
##### We need to split the TF-IDF data into training and testing sets. 
sms_train, sms_test = tf_idf.randomSplit([0.8, 0.2], seed=13)


In [65]:
##### Create a LogisticRegression object and fit it to the training data.
from pyspark.ml.classification import LogisticRegression
logistic = LogisticRegression(regParam=0.2).fit(sms_train)

In [66]:
##### Make predictions on the testing data
prediction = logistic.transform(sms_test)

In [67]:
##### Create a confusion matrix, comparing predictions to known labels


prediction.groupBy('label', 'prediction').count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0|   47|
|    0|       0.0|  987|
|    1|       1.0|  124|
|    0|       1.0|    3|
+-----+----------+-----+



In [None]:
###### The classifier won't be fooled by spam SMS.