In [1]:
import string

from pyspark.sql import SparkSession

from pyspark.sql.functions import col, when, length, size, split, udf, rand, size, regexp_replace
from pyspark.sql.types import IntegerType, StringType

from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer,StringIndexer, RegexTokenizer,StopWordsRemover

from pyspark.ml.classification import NaiveBayes, RandomForestClassifier, LogisticRegression, DecisionTreeClassifier, GBTClassifier

from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

# Spark Session

In [2]:
spark = SparkSession.builder.getOrCreate()

23/07/19 14:20:02 WARN Utils: Your hostname, Ordenador-portatil-de-Javier.local resolves to a loopback address: 127.0.0.1; using 192.168.0.28 instead (on interface en0)
23/07/19 14:20:02 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/19 14:20:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Data

## Import
[Data Process](../../Data_Process/train_data_process.ipynb)

In [3]:
# hdfs_path = 'hdfs://ruta/archivo.csv'

data = spark.read.json("../../../kaggle/data_json")
data.show()

                                                                                

+------+--------------------+
|target|                text|
+------+--------------------+
|     0|Lawsuit seeks inv...|
|     0|There are very fe...|
|     0|USA has gone to H...|
|     0|AOC is a Puerto R...|
|     0|Men: wOMen be OuT...|
|     1|Then again, he do...|
|     1|@gisewaaa we must...|
|     1|cum gurgling fagg...|
|     0|@dhiggins63 Satan...|
|     1|Fuck you, you ret...|
|     1|You next on the l...|
|     1|Lmaooooo niggas a...|
|     0|Everyday in Kashm...|
|     1|Look at this fagg...|
|     0|Britian needs to ...|
|     0|Replace the word ...|
|     1|These niggas Got ...|
|     0|Wow you sound lik...|
|     1|Fuck you've got a...|
|     0|The State Departm...|
+------+--------------------+
only showing top 20 rows



In [4]:
data = data.dropna()
data.count()

                                                                                

161337

## Train Test Split

In order to include all three dataframes in the training stage, the data is randomly mixed.

In [5]:
data = data.orderBy(rand(seed=43))

The data is divided in train and test. Being the train the 80% of the total data.

In [6]:
split_index = int(data.count() * 0.8)

df_train = data.limit(split_index)
df_test = data.subtract(df_train)

# Text Cleaning

Once the data analysis has been completed in Jupyter, the text cleaning stage is started.

Tweets require lots of cleaning but it is inefficient to clean every single tweet because that would consume too much time. A general approach must be implemented for cleaning.

* The most common type of words that require cleaning in oov have punctuations at the start or end. Those words doesn't have embeddings because of the trailing punctuations. Punctuations #, @, !, ?, +, &, -, $, =, <, >, |, {, }, ^, ', (, ),[, ], *, %, ..., ', ., :, ; are separated from words
* Special characters that are attached to words are removed completely
* Contractions are expanded
* Urls are removed
* Character entity references are replaced with their actual symbols
* Typos and slang are corrected, and informal abbreviations are written in their long forms
* Some words are replaced with their acronyms and some words are grouped into one
* Finally, hashtags and usernames contain lots of information about the context but they are written without spaces in between words so they don't have embeddings. Informational usernames and hashtags should be expanded but there are too many of them. Due to the project deadline, hashtags and usernames haven't been expanded in detail, a list of expanded usernames was taken in order to achive this.

In [9]:
replace_text = udf(lambda text: 
                    text.replace("å_", "")
                        .replace("fromåÊwounds", "from wounds")
                        .replace("åÊ", "")
                        .replace("åÈ", "")
                        .replace("JapÌ_n", "Japan")
                        .replace("Ì©", "e")
                        .replace("å¨", "")
                        .replace("SuruÌ¤", "Suruc")
                        .replace("åÇ", "")
                        .replace("å£3million", "3 million")
                        .replace("åÀ", "")
                        .replace("he's", "he is")
                        .replace("there's", "there is")
                        .replace("We're", "We are")
                        .replace("That's", "That is")
                        .replace("won't", "will not")
                        .replace("they're", "they are")
                        .replace("Can't", "Cannot")
                        .replace("wasn't", "was not")
                        .replace("aren't", "are not")
                        .replace("isn't", "is not")
                        .replace("What's", "What is")
                        .replace("haven't", "have not")
                        .replace("hasn't", "has not")
                        .replace("There's", "There is")
                        .replace("He's", "He is")
                        .replace("It's", "It is")
                        .replace("You're", "You are")
                        .replace("I'M", "I am")
                        .replace("shouldn't", "should not")
                        .replace("wouldn't", "would not")
                        .replace("i'm", "I am")
                        .replace("I'm", "I am")
                        .replace("Isn't", "is not")
                        .replace("Here's", "Here is")
                        .replace("you've", "you have")
                        .replace("we're", "we are")
                        .replace("what's", "what is")
                        .replace("couldn't", "could not")
                        .replace("we've", "we have")
                        .replace("who's", "who is")
                        .replace("y'all", "you all")
                        .replace("would've", "would have")
                        .replace("it'll", "it will")
                        .replace("we'll", "we will")
                        .replace("We've", "We have")
                        .replace("he'll", "he will")
                        .replace("Y'all", "You all")
                        .replace("Weren't", "Were not")
                        .replace("Didn't", "Did not")
                        .replace("they'll", "they will")
                        .replace("they'd", "they would")
                        .replace("DON'T", "DO NOT")
                        .replace("they've", "they have")
                        .replace("i'd", "I would")
                        .replace("should've", "should have")
                        .replace("where's", "where is")
                        .replace("we'd", "we would")
                        .replace("i'll", "I will")
                        .replace("weren't", "were not")
                        .replace("They're", "They are")
                        .replace("let's", "let us")
                        .replace("it's", "it is")
                        .replace("can't", "cannot")
                        .replace("don't", "do not")
                        .replace("you're", "you are")
                        .replace("i've", "I have")
                        .replace("that's", "that is")
                        .replace("i'll", "I will")
                        .replace("doesn't", "does not")
                        .replace("i'd", "I would")
                        .replace("didn't", "did not")
                        .replace("ain't", "am not")
                        .replace("you'll", "you will")
                        .replace("I've", "I have")
                        .replace("Don't", "do not")
                        .replace("I'll", "I will")
                        .replace("I'd", "I would")
                        .replace("Let's", "Let us")
                        .replace("you'd", "You would")
                        .replace("It's", "It is")
                        .replace("Ain't", "am not")
                        .replace("Haven't", "Have not")
                        .replace("Could've", "Could have")
                        .replace("youve", "you have")
                        .replace("donå«t", "do not")
                        .replace("@", " @ ")
                        .replace("#", " # ")
                        .replace("!", " ! ")
                        .replace("?", " ? ")
                        .replace("+", " + ")
                        .replace("&", " & ")
                        .replace("*", " * ")
                        .replace("[", " [ ")
                        .replace("]", " ] ")
                        .replace("-", " - ")
                        .replace("%", " % ")
                        .replace(".", " . ")
                        .replace(":", " : ")
                        .replace("/", " / ")
                        .replace("(", " ( ")
                        .replace(")", " ) ")
                        .replace(";", " ; ")
                        .replace("$", " $ ")
                        .replace("=", " = ")
                        .replace(">", " > ")
                        .replace("<", " < ")
                        .replace("|", " | ")
                        .replace("{", " { ")
                        .replace("}", " } ")
                        .replace("^", " ^ ")
                        .replace("'", " ' ")
                        .replace("`", " ` ")
                        .replace("...", " ... ")
                        .replace("..", " ... ") if text is not None else None, StringType())



df_train = df_train.withColumn("text", replace_text(df_train["text"]))
df_test = df_test.withColumn("text", replace_text(df_test["text"]))

# Mostrar el DataFrame transformado
df_train.show(5)
df_test.show(5)

                                                                                

+------+--------------------+
|target|                text|
+------+--------------------+
|     0|I, a Catholic and...|
|     1|ayo i even kill h...|
|     0|Trans rights are ...|
|     0|the eu siding wit...|
|     0|   #   porn,   # ...|
+------+--------------------+
only showing top 5 rows





+------+--------------------+
|target|                text|
+------+--------------------+
|     0|Some women are ju...|
|     0|Free Tay K   ?   ...|
|     1|   *   Insert edg...|
|     0|   @   lizforus L...|
|     1|   &   amp   ;   ...|
+------+--------------------+
only showing top 5 rows



                                                                                

# Preprocess

## Tokenizer 

* **Tokenizer:** <br>
The Tokenizer is a feature transformer that takes an input text column and splits it into individual words or tokens. It is used to preprocess the text data before applying any machine learning algorithms. In this case, the input text column is "text_cleaned" which contains the preprocessed and cleaned text data. The Tokenizer transforms the "text_cleaned" column into a new column called "tokens" where each row contains an array of tokens (words).

In [10]:
regex_tokenizer = RegexTokenizer(inputCol="text", outputCol="words", pattern="\\W")
raw_words_train = regex_tokenizer.transform(df_train)
raw_words_test = regex_tokenizer.transform(df_test)
raw_words_train.show(5)
raw_words_test.show(5)

+------+--------------------+--------------------+
|target|                text|               words|
+------+--------------------+--------------------+
|     0|I, a Catholic and...|[i, a, catholic, ...|
|     1|ayo i even kill h...|[ayo, i, even, ki...|
|     0|Trans rights are ...|[trans, rights, a...|
|     0|the eu siding wit...|[the, eu, siding,...|
|     0|   #   porn,   # ...|[porn, android, i...|
+------+--------------------+--------------------+
only showing top 5 rows



                                                                                

+------+--------------------+--------------------+
|target|                text|               words|
+------+--------------------+--------------------+
|     0|Some women are ju...|[some, women, are...|
|     0|Free Tay K   ?   ...|[free, tay, k, th...|
|     1|   *   Insert edg...|[insert, edgy, co...|
|     0|   @   lizforus L...|[lizforus, legal,...|
|     1|   &   amp   ;   ...|[amp, these, bitc...|
+------+--------------------+--------------------+
only showing top 5 rows



## Stop Words

* **StopWords:** <br>
Stop words are commonly used words in a language that typically do not carry much meaning or contribute significantly to the overall understanding of a text. Examples of stop words in English include "the", "is", "and", "a", and "an". These words are often filtered out or removed from text data during natural language processing tasks, such as text classification or sentiment analysis.

In [11]:
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
words_df_train = remover.transform(raw_words_train)
words_df_test = remover.transform(raw_words_test)
words_df_train.show(5)
words_df_test.show(5)

                                                                                

+------+--------------------+--------------------+--------------------+
|target|                text|               words|            filtered|
+------+--------------------+--------------------+--------------------+
|     0|I, a Catholic and...|[i, a, catholic, ...|[catholic, jesuit...|
|     1|ayo i even kill h...|[ayo, i, even, ki...|[ayo, even, kill,...|
|     0|Trans rights are ...|[trans, rights, a...|[trans, rights, h...|
|     0|the eu siding wit...|[the, eu, siding,...|[eu, siding, coun...|
|     0|   #   porn,   # ...|[porn, android, i...|[porn, android, i...|
+------+--------------------+--------------------+--------------------+
only showing top 5 rows

+------+--------------------+--------------------+--------------------+
|target|                text|               words|            filtered|
+------+--------------------+--------------------+--------------------+
|     0|Some women are ju...|[some, women, are...|[women, talented,...|
|     0|Free Tay K   ?   ...|[free, tay

## Count Vectorizer

* **CountVectorizer:** <br>
The CountVectorizer is a feature transformer that converts a collection of text documents into a matrix of token counts. It takes an input column of tokens and outputs a sparse vector representation of the token counts. In this case, the input column is "tokens" which contains the array of tokens generated by the Tokenizer. The CountVectorizer learns a vocabulary of distinct tokens from the training data and represents each document as a vector of token counts.

In [12]:
cv = CountVectorizer(inputCol="filtered", outputCol="features")

# train
model_train = cv.fit(words_df_train)
countVectorizer_train = model_train.transform(words_df_train)
countVectorizer_train = countVectorizer_train.withColumn("label",col('target'))
countVectorizer_train.show(5)

# test
model_test = cv.fit(words_df_test)
countVectorizer_test = model_test.transform(words_df_test)
countVectorizer_test= countVectorizer_test.withColumn("label",col('target'))
countVectorizer_test.show(5)

                                                                                

+------+--------------------+--------------------+--------------------+--------------------+-----+
|target|                text|               words|            filtered|            features|label|
+------+--------------------+--------------------+--------------------+--------------------+-----+
|     0|I, a Catholic and...|[i, a, catholic, ...|[catholic, jesuit...|(62068,[1,37,44,2...|    0|
|     1|ayo i even kill h...|[ayo, i, even, ki...|[ayo, even, kill,...|(62068,[15,31,33,...|    1|
|     0|Trans rights are ...|[trans, rights, a...|[trans, rights, h...|(62068,[27,49,60]...|    0|
|     0|the eu siding wit...|[the, eu, siding,...|[eu, siding, coun...|(62068,[22,41,149...|    0|
|     0|   #   porn,   # ...|[porn, android, i...|[porn, android, i...|(62068,[7,63,90,9...|    0|
+------+--------------------+--------------------+--------------------+--------------------+-----+
only showing top 5 rows





+------+--------------------+--------------------+--------------------+--------------------+-----+
|target|                text|               words|            filtered|            features|label|
+------+--------------------+--------------------+--------------------+--------------------+-----+
|     0|Some women are ju...|[some, women, are...|[women, talented,...|(21455,[6,23,33,3...|    0|
|     0|Free Tay K   ?   ...|[free, tay, k, th...|[free, tay, k, ni...|(21455,[20,36,204...|    0|
|     1|   *   Insert edg...|[insert, edgy, co...|[insert, edgy, co...|(21455,[549,579,6...|    1|
|     0|   @   lizforus L...|[lizforus, legal,...|[lizforus, legal,...|(21455,[232,819,9...|    0|
|     1|   &   amp   ;   ...|[amp, these, bitc...|[amp, bitches, bo...|(21455,[4,5,36,71...|    1|
+------+--------------------+--------------------+--------------------+--------------------+-----+
only showing top 5 rows



                                                                                

# Models
As evaluation metrics ROC and Accuracy will be used. <br>

* **ROC curve** is a graphical representation that shows the performance of a binary classification model as the decision threshold is varied. On the x-axis, the false positive rate (FPR) is plotted, which is the proportion of negative instances incorrectly classified as positive. On the y-axis, the true positive rate (TPR) is plotted, which is the proportion of positive instances correctly classified as positive.



Separate train and validate data

In [13]:
(train, validate) = countVectorizer_train.randomSplit([0.8, 0.2],seed = 42)

In [24]:
trainData = countVectorizer_train
testData = countVectorizer_test

## Naive Bayes

In [14]:
nb = NaiveBayes(modelType="multinomial",labelCol="label", featuresCol="features")
nbModel = nb.fit(train)
nb_predictions = nbModel.transform(validate)

                                                                                

In [15]:
evaluator = BinaryClassificationEvaluator()
print('Test Area Under ROC', evaluator.evaluate(nb_predictions))

23/07/19 14:24:19 WARN DAGScheduler: Broadcasting large task binary with size 1781.0 KiB
                                                                                

Test Area Under ROC 0.5489994745013782


In [16]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
nb_accuracy = evaluator.evaluate(nb_predictions)
print("Accuracy of NaiveBayes is = %g"% (nb_accuracy))

23/07/19 14:24:26 WARN DAGScheduler: Broadcasting large task binary with size 1793.6 KiB
23/07/19 14:24:27 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS


Accuracy of NaiveBayes is = 0.779466


                                                                                

## Decision Tree

In [17]:
from pyspark.ml.classification import DecisionTreeClassifier

dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'target', maxDepth = 3)
dtModel = dt.fit(train)
dtPreds = dtModel.transform(validate)
dtPreds.show(5)

23/07/19 14:24:47 WARN DAGScheduler: Broadcasting large task binary with size 1453.8 KiB
23/07/19 14:25:10 WARN DAGScheduler: Broadcasting large task binary with size 1836.8 KiB
23/07/19 14:25:11 WARN MemoryStore: Not enough space to cache rdd_399_0 in memory! (computed 149.3 MiB so far)
23/07/19 14:25:11 WARN BlockManager: Persisting block rdd_399_0 to disk instead.
23/07/19 14:26:19 WARN MemoryStore: Not enough space to cache rdd_399_0 in memory! (computed 339.9 MiB so far)
23/07/19 14:26:51 WARN DAGScheduler: Broadcasting large task binary with size 1837.5 KiB
23/07/19 14:26:51 WARN MemoryStore: Not enough space to cache rdd_399_0 in memory! (computed 339.9 MiB so far)
23/07/19 14:27:23 WARN DAGScheduler: Broadcasting large task binary with size 1838.0 KiB
23/07/19 14:27:24 WARN MemoryStore: Not enough space to cache rdd_399_0 in memory! (computed 339.9 MiB so far)
                                                                                

+------+--------------------+--------------------+--------------------+--------------------+-----+-----------------+--------------------+----------+
|target|                text|               words|            filtered|            features|label|    rawPrediction|         probability|prediction|
+------+--------------------+--------------------+--------------------+--------------------+-----+-----------------+--------------------+----------+
|     0|   !      !      ...|[rt, urkindofbran...|[rt, urkindofbran...|(62068,[3,4,13,25...|    0|[62818.0,28858.0]|[0.68521750512675...|       0.0|
|     0|   !    thank u  ...|[thank, u, im, tr...|[thank, u, im, tr...|(62068,[9,18,29,3...|    0|[62818.0,28858.0]|[0.68521750512675...|       0.0|
|     0|   !    thank u  ...|[thank, u, im, tr...|[thank, u, im, tr...|(62068,[9,18,29,3...|    0|[62818.0,28858.0]|[0.68521750512675...|       0.0|
|     0|   #      #   Who...|[who, cares, prot...|[cares, protect, ...|(62068,[110,499,5...|    0|[62818.0

In [18]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
dt_accuracy = evaluator.evaluate(dtPreds)
print("Accuracy of Decision Trees is = %g"% (dt_accuracy))

                                                                                

Accuracy of Decision Trees is = 0.70197


## GBT Classifier

In [20]:
from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier(maxIter=10)
gbtModel = gbt.fit(train)
gbtPreds = gbtModel.transform(validate)
gbtPreds.show(5)

23/07/19 14:34:24 WARN DAGScheduler: Broadcasting large task binary with size 1453.5 KiB
23/07/19 14:34:52 WARN DAGScheduler: Broadcasting large task binary with size 1841.6 KiB
23/07/19 14:34:53 WARN MemoryStore: Not enough space to cache rdd_606_0 in memory! (computed 149.3 MiB so far)
23/07/19 14:34:53 WARN BlockManager: Persisting block rdd_606_0 to disk instead.
23/07/19 14:36:01 WARN MemoryStore: Not enough space to cache rdd_606_0 in memory! (computed 339.9 MiB so far)
23/07/19 14:36:01 WARN MemoryStore: Not enough space to cache rdd_606_0 in memory! (computed 26.8 MiB so far)
23/07/19 14:36:58 WARN DAGScheduler: Broadcasting large task binary with size 1842.4 KiB
23/07/19 14:36:59 WARN MemoryStore: Not enough space to cache rdd_606_0 in memory! (computed 339.9 MiB so far)
23/07/19 14:37:33 WARN DAGScheduler: Broadcasting large task binary with size 1842.9 KiB
23/07/19 14:37:34 WARN MemoryStore: Not enough space to cache rdd_606_0 in memory! (computed 339.9 MiB so far)
23/07/19 

+------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+--------------------+----------+
|target|                text|               words|            filtered|            features|label|       rawPrediction|         probability|prediction|
+------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+--------------------+----------+
|     0|   !      !      ...|[rt, urkindofbran...|[rt, urkindofbran...|(62068,[3,4,13,25...|    0|[0.38832960984962...|[0.68495965305164...|       0.0|
|     0|   !    thank u  ...|[thank, u, im, tr...|[thank, u, im, tr...|(62068,[9,18,29,3...|    0|[0.46945988608090...|[0.71888140479252...|       0.0|
|     0|   !    thank u  ...|[thank, u, im, tr...|[thank, u, im, tr...|(62068,[9,18,29,3...|    0|[0.46945988608090...|[0.71888140479252...|       0.0|
|     0|   #      #   Who...|[who, cares, prot...|[cares, protect, ...|(62068,[110,499,5

                                                                                

In [21]:
gbtEval = BinaryClassificationEvaluator()
gbtROC = gbtEval.evaluate(gbtPreds, {gbtEval.metricName: "areaUnderROC"})
print("Test Area Under ROC: " + str(gbtROC))

                                                                                

Test Area Under ROC: 0.7921680466831243


In [22]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
gb_accuracy = evaluator.evaluate(gbtPreds)
print("Accuracy of GBT is = %g"% (gb_accuracy))

[Stage 487:>                                                        (0 + 1) / 1]

Accuracy of GBT is = 0.759571


                                                                                

# Prediction

GBTClassifier is used to make predictions because it has the best evaluation metrics

In [None]:
gbt = GBTClassifier(maxIter=10)
gbtModel = gbt.fit(trainData)
gbtPreds = gbtModel.transform(testData)
predictions = gbtPreds.select('id','prediction')
predictions.show(5)

[Stage 649:>                                                        (0 + 1) / 1]