<a href="https://colab.research.google.com/github/Anastasios-K/Pyspark/blob/master/Spam_Detection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install gitpython
!pip install pyspark



In [0]:
from git import Repo
import zipfile
import shutil
import tempfile

import pyspark
import os
import re
from nltk.corpus import stopwords
import nltk
import numpy as np
from pyspark import StorageLevel
from pyspark.mllib.feature import IDF, Normalizer
import string
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, SVMWithSGD
import time

In [0]:
# Update to a specific Java version which is compatible with Pyspark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
!update-alternatives --set java /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java

# Comment out and run the code below to have a look at the Java version
# !java -version

In [0]:
unique_dir = tempfile.mkdtemp(prefix="Spam_Detection", dir = os.getcwd()) # create a unique directory (its name starts with Spam_Detection) to clone the dataset
Repo.clone_from("https://github.com/Anastasios-K/Pyspark.git", unique_dir) # clone step

os.chdir(unique_dir)
zipfile.ZipFile("lingspam_public.zip", mode="r").extractall("target_dir")
os.chdir("target_dir/lingspam_public/")

dir_list = os.listdir(os.getcwd())

In [0]:
# comment out to make sure that we are in the correct directory
# we need to have access to ('readme.txt', 'stop', 'lemm', 'bare', 'lemm_stop')

# os.listdir()

In [0]:
sc = pyspark.SparkContext.getOrCreate() # Initiate Pyspark context

In [0]:
""" Create the required RDDs depending on the text files """

def Create_RDDs(directory):
    pathstring = os.path.abspath(directory)
    tr_RDD = sc.wholeTextFiles(pathstring + "/part[1-9]/") # compose a training set using the first 9 folders
    ts_RDD = sc.wholeTextFiles(pathstring + "/part10/") # keep the last forlder for test set

    ts_RDD2 = ts_RDD.map(lambda text: (re.split('[/.]', text[0])[-2], text[1])) 
    tr_RDD2 = tr_RDD.map(lambda text: (re.split('[/.]', text[0])[-2], text[1]))

    ts_RDD2.cache()
    tr_RDD2.cache()
    pairs = (directory, tr_RDD2, ts_RDD2)
    return(pairs)

In [0]:
pair_list = [Create_RDDs(x) for x in dir_list if ".txt" not in x] # get the RDDs linked to each directory

pair_dict = {pair_list[x][0]:(pair_list[x][1], pair_list[x][2]) for x in range(len(pair_list))} # create a dictionary with the directories and the corresponding RDDs

train_Rdd, test_Rdd = pair_dict['bare']

partial_Rdd = train_Rdd.sample(True, 0.1, seed=1) # 10% of the train_Rdd to be used in the sections below for facilitation

# Comment out and run the code below to have a look at the outcome
# WARNING!!! it takes some time

# print(train_Rdd.count())
# print(test_Rdd.count())
# print(train_Rdd.take(1))
# print(test_Rdd.take(1))
# print(partial_Rdd.take(1))

In [11]:
""" Functions for preprocessing """

def Tokenisation(text): # Breake down into tokens
    nltk.download('punkt')
    return(nltk.word_tokenize(text))

def Punct_removal(tokens): # Remmove punctuation (the most efficient method)
    tokens2 = [token.strip(string.punctuation) for token in tokens]
    return(tokens2)

def RDD_preparation(rdd): # Implement the entire preprocessing
    rdd_vals1 = rdd.values() # Get values ONLY. 
    rdd_vals2 = rdd_vals1.map(Tokenisation) # Tokenise the values
    rdd_vals3 = rdd_vals2.map(Punct_removal) # remove punctuation
    rdd_vals4 = rdd.keys().zip(rdd_vals3) # match the kaeys nad values again
    RDD_final = rdd_vals4.map(lambda string: (string[0], list(filter(None, string[1])))) # filter out empty strings
    return(RDD_final)

partial_Rdd1 = RDD_preparation(partial_Rdd) # use the partial_Rdd to test the fucntionality

# Comment out and run the code below to have a look at the outcome

# print(partial_Rdd1.count())
print(partial_Rdd1.take(1))

[('9-536msg1', ['Subject', 'linguistics', '36', '1', '1998', 'linguistics', 'volume', '36', '1', '1998', 'mouton', 'de', 'gruyter', 'berlin', 'new', 'york', 'andrew', 'spencer', 'and', 'marina', 'zaretskaya', 'verb', 'prefixation', 'in', 'russian', 'as', 'lexical', 'subordination', 'thomas', 'berg', 'the', 'resolution', 'of', 'number', 'conflicts', 'in', 'english', 'and', 'german', 'agreement', 'patterns', 'kersti', 'borjars', 'and', 'carol', 'chapman', 'agreement', 'and', 'pro-drop', 'in', 'some', 'dialects', 'of', 'english', 'jose', 'hualde', 'a', 'gap', 'filled', 'postpostinitial', 'accent', 'in', 'azkoitia', 'basque', 'robin', 'hooper', 'universals', 'of', 'narrative', 'pragmatics', 'a', 'polynesian', 'case', 'study', 'nikolaus', 'p', 'himmelmann', 'documentary', 'and', 'descriptive', 'linguistics', 'book', 'reviews', 'notice', 'from', 'the', 'board', 'of', 'editors', 'mouton', 'de', 'gruyter', 'walter', 'de', 'gruyter', 'inc', 'postfach', '30', '34', '21', '200', 'saw', 'mill', 'r

In [0]:
# Create a fixed-size vector from a word list

def Hashing_vectors(text,dimensions): # arguments: the list and the size of the output vector
    vector = np.zeros(dimensions)  # create vector of 0s
    for word in text:
        vector[hash(word) % dimensions] += 1 # add 1 at the hashed address 
    return(vector) # return hashed word vector

def Norm_Tfidf(rdd, dimensions):
    keys_RDD = rdd.keys()
    vals_RDD = rdd.values()

    # create vectors
    vector_RDD = vals_RDD.map(lambda tokens: Hashing_vectors(tokens, dimensions))
    vector_RDD.persist(StorageLevel.MEMORY_ONLY) # save in memeory only to accelarate the process

    tfidf_RDD = IDF().fit(vector_RDD).transform(vector_RDD) # implement TF.IDF algorithm
    norm_tfidf_RDD = Normalizer().transform(tfidf_RDD) # round the hashing values to 1
    final_tfidf_RDD = keys_RDD.zip(norm_tfidf_RDD) # match the corresponding rdd keys
    return(final_tfidf_RDD)
    

dimensions = 20 # use low dimensionality value to test
partial_rdd2 = Norm_Tfidf(partial_Rdd1, dimensions) # use the latest partial_Rdd(1) to check the functionality

# Comment out and run the code below to have a look at the outcome
# print(partial_rdd2.take(1))

In [14]:
""" For user to get a visual representation of the RDD """
# comment out and run the code below to get a line by line visual representation

# rdd_sample = partial_rdd2.take(100)
# for x in rdd_sample:
#   print(x,"\n")

' For user to get a visual representation of the RDD '

In [16]:
# Generate the target labels (whether it is SPAM or NOT)
# NOTE: 1 --> SPAM    &    0 --> NO SPAM

def Create_Labels(rdd): 
    class_vector_RDD = rdd.map(lambda label: (1 if (label[0].startswith('spmsg')) else 0, label[1])) # detect spam based on "spmsg" text at the beginning
    target_RDD = class_vector_RDD.map(lambda cv: LabeledPoint(cv[0],cv[1]) ) 
    return(target_RDD)

final_partial = Create_Labels(partial_rdd2)

# Comment out and run the code below to have a look at the outcome
# print(final_partial.take(1))

[LabeledPoint(0.0, [0.154113033776224,0.08390598470004514,0.2526542937375078,0.1687506411573645,0.07005137898919273,0.056041103191354184,0.42667700524539587,0.048945157741692995,0.07044444113822321,0.21015413696757818,0.028020551595677092,0.3194001004972459,0.1263271468687539,0.4454715931166001,0.0,0.02796866156668171,0.08437532057868224,0.05593732313336342,0.06316357343437695,0.5504988510687896])]


In [0]:
""" Train and Test 2 algorithms """

def Train_Model(train_rdd):
    starting_point = time.perf_counter()
    print('Training process')
    print("-" * 80)
    model1 = LogisticRegressionWithLBFGS.train(train_rdd) # logistic regression
    print("Model 1 --> Logistic Regression")
    print(f"{type(model1)}")
    print("-" * 80)
    model2 = SVMWithSGD.train(train_rdd) # support vector machine
    print("Model 2 --> SVM")
    print(f"{type(model2)}")
    print("-" * 80)
    end = time.perf_counter()
    print("\n", f"Execution time --> {round(end - starting_point, 2)} in sec") # counting the execution time
    return (model1,model2)

def Test_Model(model, test_rdd):
    pred_and_targ = test_rdd.map(lambda element: (model.predict(element.features), element.label)) # get the prediction and ground truth (label) for each item.
    correct = pred_and_targ.filter(lambda label: label[0] == label[1]).count() # count the correct predictions 
    accuracy = correct / pred_and_targ.count()
    print(f"--> {accuracy} (data items: {pred_and_targ.count()}, correct: {correct})")
    return(accuracy)

# Comment out to check (final_partial is used once more to easily test the functionality)

# Log_Reg, SVM = Train_Model(final_partial)
# accur_evaluation = Test_Model(SVM, final_partial)

In [0]:
""" Apply all the above to the actual Training RDD ("Train_Rdd) which is created earlier """

def Preproc_Full(rdd,dimensions): # the functions below have been fully explained earlier
    train_Rdd1 = RDD_preparation(rdd)
    train_Rdd2 = Norm_Tfidf(train_Rdd1, dimensions)
    train_Rdd3 = Create_Labels(train_Rdd2)
    return(train_Rdd3)

new_dimensions = dimensions # dimensionality value may change BUT currently the previous number is used
final_TRAIN_rdd = Preproc_Full(train_Rdd, new_dimensions)
final_TEST_rdd = Preproc_Full(test_Rdd, new_dimensions)

# print(final_TRAIN_rdd.take(1))
# print(final_TEST_rdd.take(1))

In [19]:
def Train_Test_preocess(train_rdd,test_rdd):
    models = Train_Model(train_rdd)
    results = [[],[]] # matrix for 2 modes (training/test) vs n models (currently 3)
    for x, model in enumerate(models):
        print("\n", f"Model {x+1}")
        print('Training Accuracy')
        results[0].append(Test_Model(model, train_rdd))
        print("-" * 80)
        print('Test Accuracy')
        results[1].append(Test_Model(model, test_rdd))
    return(results)

model_evaluation = Train_Test_preocess(final_TRAIN_rdd, final_TEST_rdd)

Training process
--------------------------------------------------------------------------------
Model 1 --> Logistic Regression
<class 'pyspark.mllib.classification.LogisticRegressionModel'>
--------------------------------------------------------------------------------
Model 2 --> SVM
<class 'pyspark.mllib.classification.SVMModel'>
--------------------------------------------------------------------------------

 Execution time --> 79.29 in sec

 Model 1
Training Accuracy
--> 0.9062259800153728 (data items: 2602, correct: 2358)
--------------------------------------------------------------------------------
Test Accuracy
--> 0.7285223367697594 (data items: 291, correct: 212)

 Model 2
Training Accuracy
--> 0.8339738662567256 (data items: 2602, correct: 2170)
--------------------------------------------------------------------------------
Test Accuracy
--> 0.8316151202749141 (data items: 291, correct: 242)


In [0]:
# comment out and run the code below to DELETE the temporal direcory
# ---> WARNING <--- it will DELETE everything and RETURN the files and directories in the INITIAL situation

# os.chdir("/content")
# shutil.rmtree(unique_dir)