This notebook has following goals:
- Get difference between old and new wiki page
- Cleaning (lower cases, removing stopwords)
- Lemmatizing
- TF-IDF
- Feature engineering
- ... from: The acquired training data AND future incoming streaming instances

In [1]:
# Import libraries
from pyspark.streaming import StreamingContext
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:95% !important; }</style>"))
from threading import Thread
from pyspark.streaming import StreamingContext
from pyspark.sql import Row
from pyspark.sql.functions import udf, struct, array, col, lit, regexp_replace, lower
from pyspark.sql.types import StringType
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from difflib import unified_diff
import difflib
import pandas as pd
from pyspark.sql.types import IntegerType, StringType
import re
import string
import nltk
from pyspark.sql.functions import monotonically_increasing_id 
import itertools
from pyspark.sql.functions import col

# 0. Get streaming instances
This section is only for the purpose to set up and do intermediate checks of the pipeline with live data.

In [2]:
# This only serves as trial to test my functions on live incoming instances
class StreamingThread(Thread):
    def __init__(self, ssc):
        Thread.__init__(self)
        self.ssc = ssc
    # Start stream
    def run(self):
        ssc.start()
        ssc.awaitTermination()
    # Stop stream
    def stop(self):
        print('----- Stopping... this may take a few seconds -----')
        self.ssc.stop(stopSparkContext=False, stopGraceFully=True)

# ssc = StreamingContext(sc, 10) # Every 10 seconds, construct a mini-batch of RDDs
# lines = ssc.socketTextStream("seppe.net", 7778)
# lines.pprint()
# ssc_t = StreamingThread(ssc)
# ssc_t.start()

In [3]:
# ssc_t.stop()

# 1. Load in the collected data
This part only serves to load in our collected data: to then experiment and perform featurization, TF-IDF and construct classifier.

In [4]:
# Get Spark dataframe --> various sites state that a dataframe is better than a DDR for textual data
def get_wiki_df():
    # The path can be either a single text file or a directory storing text files. 
    # It's possible you'll need to change path to your own directory
    path = "../../data/*"
    wiki_df = spark.read.json(path)

    # Uncomment if you want its schema
    # wiki_df.printSchema()
    return wiki_df

## 1.1. Check the amount of label counts

In [5]:
def get_label_count(wiki_df):
    """""""""
    Last result (so you don't need to run it each time) 
    -> safe: 30333, unsafe: 4136, vandal: 270
    """""""""
    # Creates a temporary view using the wiki DataFrame
    wiki_df.createOrReplaceTempView("wikidata")                            
                                                                                  
    # Check the amount of safes/unsafes/vandals                                    
    label_df = spark.sql("SELECT label, count(*) FROM wikidata GROUP BY label")    
    return label_df.show()                                                         

## Helperfunctions

In [33]:
######################################################################################
# HELPER FUNCTIONS
######################################################################################

def show_X_rows(df, x): #--------------------------------> Shows first X number of spark dataframe rows
    # Convert list to RDD
    rdd = spark.sparkContext.parallelize(df.take(x))

    # Create data frame
    df_temp = spark.createDataFrame(rdd)
    return df_temp.show()

def flatten(nested_list):
    flat_list = []
    for sublist in nested_list:
        if type(sublist) == list:
            for item in sublist:
                flat_list.append(item)
        else:
            flat_list.append(sublist)
    return flat_list

######################################################################################
# FUNCTIONS TO GET DIFFERENCE BETWEEN OLD AND NEW TEXT
######################################################################################

# Get edited part of wiki page (credits to the professor)
def get_diff_1(old, new):
    return '\n'.join([ l for l in unified_diff(old.split('\n'), new.split('\n')) if l.startswith('+') or l.startswith('-') ])

# The difference function that will take a very long time to compute for all instances
def get_diff_2(old, new):
    deleted_words = []
    added_words = []
    temp_i = -100
    new_word = ''
    status = 'none'
    for i, s in enumerate(difflib.ndiff(old, new)):
        if s[0] == ' ':
            if status == 'adding':
                added_words.append(new_word)
                new_word = ''
            elif status == 'deleting':
                deleted_words.append(new_word)
                new_word = ''
            status = 'none'
            continue
        elif s[0] == '-':
            if status != 'deleting':
                added_words.append(new_word)
                new_word = ''
                status = 'deleting'
            new_word += s[-1]
        elif s[0] == '+':
            if status != 'adding':
                deleted_words.append(new_word)
                new_word = ''
                status = 'adding'
            new_word += s[-1]
    if new_word != '' and status == 'deleting':
        deleted_words.append(new_word)
    elif new_word != '' and status == 'adding':
        added_words.append(new_word)
    return {'added': added_words, 'deleted': deleted_words}

def get_deletions(del_add):
    return del_add['deleted']

def get_additions(del_add):
    return del_add['added']

def extract_differences(str_old, str_new):
    try:
        # Clean old and new wiki pages from html characters
        str_old, str_new = cleanhtml(str_old), cleanhtml(str_new)

        # Get big chunks of altered fragments
        diff1 = get_diff_1(str_old, str_new)


        # Clean the differences from unwanted characters to get only words
        diff2 = []
        for txt in diff1.split('\n'):
            diff2.append(''.join([x for x in txt if x in string.ascii_letters + '\'-+ 1234567890']).lower())

        # Find chunks that are related ('+' vs '-') else append them to a seperate list that doesn't need to get processed in the next part
        fully_added, fully_removed, partly_new, partly_old = find_similar_chunks(diff2)

        # Get the individual fragments that were added or deleted
        for i in range(0,len(partly_new)):
            difference = get_diff_2(partly_new[i], partly_old[i])
            for el in difference['added']:
                fully_added.append(el)
            for el in difference['deleted']:
                fully_removed.append(el)

        fully_removed = flatten(fully_removed)
        fully_added = flatten(fully_added)
        while '' in fully_added:
            fully_added.remove('')
        while '+' in fully_added:
            fully_added.remove('+')
        while '-' in fully_added:
            fully_added.remove('-')

        while '' in fully_removed:
            fully_removed.remove('')
        while '+' in fully_removed:
            fully_removed.remove('+')
        while '-' in fully_removed:
            fully_removed.remove('-')

        # Original output_lib was a dictionary
        #output_lib = {'added': fully_added, 'removed': fully_removed}

        output_str = ''
        # However, now we'd like to have the output in a list with seperator "|SEPERATIONLINEADDEDREMOVED|"
        for word in fully_added:
            output_str += '{} '.format(word)

        output_str += ' |SEPERATIONLINEADDEDREMOVED|'

        for word in fully_removed:
            output_str += ' {}'.format(word)

        full_difference = []

        return output_str
    except:
        return 'error_would_occur'

#     for i in output_lib['added']:
#         full_difference.append(i)
#     for i in output_lib['removed']:
#         full_difference.append(i)

#     return full_difference

def find_similar_chunks(chunk_list):
    fully_added, fully_removed, partly_removed, partly_added = [], [], [], []
    # Remove '', ---, +++
    while '' in chunk_list:
        chunk_list.remove('')
    while '+++ ' in chunk_list:
        chunk_list.remove('+++ ')
    while '--- ' in chunk_list:
        chunk_list.remove('--- ')
    for chunk in chunk_list:
        try:
            if chunk[3] not in string.ascii_letters + '\'-+ 1234567890':
                chunk_list.remove(chunk)
            else:
                continue
        except:
            chunk_list.remove(chunk)

    for a, b in itertools.combinations(chunk_list, 2):
        if a[1:30] == b[1:30]:
            if a[0] == '+':
                partly_added.append(a)
                partly_removed.append(b)
                chunk_list.remove(a)
                chunk_list.remove(b)
            else:
                partly_added.append(b)
                partly_removed.append(a)
                chunk_list.remove(a)
                chunk_list.remove(b)
    for rest in chunk_list:
        if rest[0] == '-':
            fully_removed.append(rest[1:].split(' '))
        elif rest[0] == '+':
            fully_added.append(rest[1:].split(' '))
    return fully_added, fully_removed, partly_removed, partly_added # The partly removed and partly added ones will be used as input to determine difference

######################################################################################
# FUNCTIONS TO CLEAN COLUMNS 'comment', 'title', 'user', 'text_old', 'text_new'
######################################################################################
def cleanhtml(raw_html):
    cleanr = re.compile('<.*?>|&([a-z0-9]+|#[0-9]{1,6}|#x[0-9a-f]{1,6});')
    cleantext = re.sub(cleanr, '', raw_html)
    
    # Remove urls
    cleantext_no_urls = re.sub(r"http\S+", "", cleantext)
    return cleantext_no_urls

def cleantext(raw):
    # If there's nothing in raw, return 'EMPTY'
    for item in 'azertyuiopmlkjhgfdsqnbvcxw,;:=ùµ$^)àç!è§(é&1234567890\"\')|@#]}{[^-_':
        if item in raw:
    
            cleanr = re.compile('<.*?>|&([a-z0-9]+|#[0-9]{1,6}|#x[0-9a-f]{1,6});')
            cleantext = re.sub(cleanr, '', raw)

            # Remove urls
            cleantext_no_urls = re.sub(r"http\S+", "", cleantext)
            return ''.join([x for x in cleantext_no_urls if x in string.ascii_letters + '\'-+ 1234567890']).lower()
    return 'empty'

def get_clean_df(df):
    # Remove url_page column
    df_without_url = df.drop('url_page')

    # Cleaning comment, title_page and name_user
    clean_udf = udf(cleantext, StringType())
    df_without_url = df_without_url.withColumn('clean_comment', clean_udf(df_without_url.comment)).drop('comment')
    df_without_url = df_without_url.withColumn('clean_title_page', clean_udf(df_without_url.title_page)).drop('title_page')
    df_without_url = df_without_url.withColumn('clean_name_user', clean_udf(df_without_url.name_user)).drop('name_user')
    
    # Clean the old and new text columns
    df_without_url = df_without_url.withColumn('clean_old_text', clean_udf(df_without_url.text_old))
    df_without_url = df_without_url.withColumn('clean_new_text', clean_udf(df_without_url.text_new))
    
    data = df_without_url.select(col("label"), col("clean_comment").alias("comment"), col("clean_title_page").alias("title_page"), col("clean_name_user").alias("name_user"), col("text_old"), col("text_new"), col("clean_old_text"), col("clean_new_text"))
    return data

def get_difference_column(df):
    difference_udf = udf(extract_differences, StringType())
    intermediate_col = df.withColumn('difference', difference_udf(df.text_old, df.text_new))
    intermediate_col = intermediate_col.drop('text_old')
    intermediate_col = intermediate_col.drop('text_new')
    return intermediate_col

def paste_words(list_of_words):
    return ' '.join([x for x in list_of_words])

######################################################################################
# FUNCTIONS TO SPIT DIFFERENCE COLUMN INTO ADDED AND REMOVED COLUMN
######################################################################################

def get_removed_col(col):
    for item in 'azertyuiopmlkjhgfdsqnbvcxw,;:=ùµ$^)àç!è§(é&1234567890\"\')|@#]}{[^-_':
            if item in col[:col.find('|SEPERATIONLINEADDEDREMOVED|')]:
                return col[:col.find('|SEPERATIONLINEADDEDREMOVED|')]
    return 'empty'

def get_added_col(col):
    for item in 'azertyuiopmlkjhgfdsqnbvcxw,;:=ùµ$^)àç!è§(é&1234567890\"\')|@#]}{[^-_':
            if item in col[col.find('|SEPERATIONLINEADDEDREMOVED|') + len('|SEPERATIONLINEADDEDREMOVED|') + 1:]:
                return col[col.find('|SEPERATIONLINEADDEDREMOVED|') + len('|SEPERATIONLINEADDEDREMOVED|') + 1:]
    return 'empty'

def split_difference_into_removed_added(df):
    get_removed_udf = udf(get_removed_col, StringType())
    df = df.withColumn('removed_words', get_removed_udf(df.difference))
    
    get_added_udf = udf(get_added_col, StringType())
    df = df.withColumn('added_words', get_added_udf(df.difference))
    
    df = df.drop('difference')
    
    return df

# 2. Preprocessing

In [7]:
# Get the data as spark dataframe
# --------------------------------- wiki_df = get_wiki_df()

In [34]:
# Get clean dataframe (cleaning of comment, title_page, name_user):
# --------------------------------- clean_df = get_clean_df(wiki_df)

# In order to get the actual difference column
# --------------------------------- df_with_difference = get_difference_column(clean_df)

# Example of a difference column of the first of 20 instances:
# difference column is in the form REMOVED PART |SEPERATIONLINEADDEDREMOVED| ADDED PART
# df_with_difference.show(400)

+------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
| label|             comment|          title_page|           name_user|      clean_old_text|      clean_new_text|          difference|
+------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|  safe|   fixed cite errors|timeline of the 2...|           john b123|short description...|short description...|last websitewww  ...|
|  safe|duplicate word re...|timeline of the 2...|             arjayay|short description...|short description...| |SEPERATIONLINEA...|
|  safe|         23 february|timeline of the 2...|      reddyhakky1998|see alsotimeline ...|see alsotimeline ...|february   |SEPER...|
|  safe|4 februaryadded w...|timeline of the 2...|  sebastianrueckoldt|see alsotimeline ...|see alsotimeline ...|world health orga...|
|  safe|removing duplicat...|list of art deco ...|  and

In [35]:
# 2 columns: removed and added with space in new function. + Clean text_old and text_new
# Split difference column into column 'removed' and column 'added'
# --------------------------------- final_df = split_difference_into_removed_added(df_with_difference)

# final_df.show(400)

+------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
| label|             comment|          title_page|           name_user|      clean_old_text|      clean_new_text|       removed_words|         added_words|
+------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|  safe|   fixed cite errors|timeline of the 2...|           john b123|short description...|short description...|   last websitewww  |cnbeta lastcnn  s...|
|  safe|duplicate word re...|timeline of the 2...|             arjayay|short description...|short description...|               empty|                the |
|  safe|         23 february|timeline of the 2...|      reddyhakky1998|see alsotimeline ...|see alsotimeline ...|         february   |           february |
|  safe|4 februaryadded w...|timeline of the 2...|  sebastianrue

In [36]:
# final_df.filter(df_removed_added_colls.label == 'vandal').show(100)

+------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
| label|             comment|          title_page|           name_user|      clean_old_text|      clean_new_text|       removed_words|         added_words|
+------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|vandal|                 add|            carnival|         75187130123|aboutthe celebrat...|aboutthe celebrat...|     eating grasss  |               empty|
|vandal|               empty|  2011 england riots|           922340146|short description...|short description...|title 2012 oli wa...|title 2011 englan...|
|vandal|               empty|history of united...|         20913123780|furtherforeign po...|furtherforeign po...|    i got to go p...|               empty|
|vandal|               empty|richard iii of en...|2a02c7f485dac0