In [1]:
import findspark
import os
findspark.init('/Users/K-Lo/spark-1.5.0')

In [2]:
import pyspark
conf = (pyspark.SparkConf()
    .setMaster('local')
    .setAppName('pyspark')
    .set("spark.executor.memory", "2g"))
sc = pyspark.SparkContext(conf=conf)

In [3]:
import re

In [4]:
'''
Based on SymSpell:

Originally written in C#:

// SymSpell: 1 million times faster through Symmetric Delete spelling correction algorithm
//
// The Symmetric Delete spelling correction algorithm reduces the complexity of edit candidate generation and dictionary lookup 
// for a given Damerau-Levenshtein distance. It is six orders of magnitude faster and language independent.
// Opposite to other algorithms only deletes are required, no transposes + replaces + inserts.
// Transposes + replaces + inserts of the input term are transformed into deletes of the dictionary term.
// Replaces and inserts are expensive and language dependent: e.g. Chinese has 70,000 Unicode Han characters!
//
// Copyright (C) 2015 Wolf Garbe
// Version: 3.0
// Author: Wolf Garbe <wolf.garbe@faroo.com>
// Maintainer: Wolf Garbe <wolf.garbe@faroo.com>
// URL: http://blog.faroo.com/2012/06/07/improved-edit-distance-based-spelling-correction/
// Description: http://blog.faroo.com/2012/06/07/improved-edit-distance-based-spelling-correction/
//
// License:
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License, 
// version 3.0 (LGPL-3.0) as published by the Free Software Foundation.
// http://www.opensource.org/licenses/LGPL-3.0
//
// Usage: single word + Enter:  Display spelling suggestions
//        Enter without input:  Terminate the program
'''
pass

In [5]:
n_partitions = 6  # number of partitions to be used

In [6]:
max_edit_distance = 3

In [7]:
# we generate and count all words for the corpus,
# then add deletes to the dictionary
# this is a slightly different approach from the Faroo algorithm
# that may be more appropriate for parallel processing

In [8]:
# general helper functions
def get_deletes_list(word):
    '''given a word, derive strings with one character deleted'''
    # takes a string as input and returns all 1-deletes in a list
    # allows for duplicates to be created, will deal with duplicates later to minimize shuffling
    if len(word)>1:
        return ([word[:c] + word[c+1:] for c in range(len(word))])
    else:
        return []

In [9]:
def copartitioned(RDD1, RDD2):
    '''check if two RDDs are copartitioned'''
    return RDD1.partitioner == RDD2.partitioner

In [10]:
def combine_joined_lists(tup):
    '''takes as input a tuple in the form (a, b) where each of a, b may be None (but not both) or a list
       and returns a concatenated list of unique elements'''
    concat_list = []
    if tup[1] is None:
        concat_list = tup[0]
    elif tup[0] is None:
        concat_list = tup[1]
    else:
        concat_list = tup[0] + tup[1]
        
    return list(set(concat_list))

In [11]:
def parallel_create_dictionary(fname):

    print "Creating dictionary..." 
    
    ############
    #
    # process corpus
    #
    ############
    
    print ">>> processing corpus words..."
    
    # http://stackoverflow.com/questions/22520932/python-remove-all-non-alphabet-chars-from-string
    regex = re.compile('[^a-z ]')

    # convert file into one long sequence of words
    make_all_lower = sc.textFile(fname).map(lambda line: line.lower())
    replace_nonalphs = make_all_lower.map(lambda line: regex.sub(' ', line))
    all_words = replace_nonalphs.flatMap(lambda line: line.split())

    # create core corpus dictionary (i.e. only words appearing in file, no "deletes") and cache it
    # output RDD of unique_words_with_count: [(word1, count1), (word2, count2), (word3, count3)...]
    count_once = all_words.map(lambda word: (word, 1))
    unique_words_with_count = count_once.reduceByKey(lambda a, b: a + b, numPartitions = n_partitions).cache()
    
    # output stats on core corpus
    print "total words processed: %i" % unique_words_with_count.map(lambda (k, v): v).reduce(lambda a, b: a + b)
    print "total unique words in corpus: %i" % unique_words_with_count.count()
    
    ############
    #
    # generate deletes list
    #
    ############
    
    # generate list of n-deletes from words in a corpus of the form: [(word1, count1), (word2, count2), ...]
    # we will handle possible duplicates after map/reduce:
    #     our thinking is the resulting suggestions lists for each delete will be much smaller than the
    #     list of potential deletes, and it is more efficient to reduce first, then remove duplicates 
    #     from these smaller lists (at each worker node), rather than calling `distinct()` on  
    #     flattened `expand_deletes` which would require a large shuffle

    ##
    ## generate 1-deletes
    ##
     
    assert max_edit_distance>0  
    print ">>> processing deletions from corpus..."
    
    generate_deletes = unique_words_with_count.map(lambda (parent, count): (parent, get_deletes_list(parent)), 
                                                      preservesPartitioning=True)
    expand_deletes = generate_deletes.flatMapValues(lambda x: x)
    
    # swap and combine, resulting RDD after processing 1-deletes has elements:
    # [(delete1, [correct1, correct2...]), (delete2, [correct1, correct2...])...]
    swap = expand_deletes.map(lambda (orig, delete): (delete, [orig]))
    combine = swap.reduceByKey(lambda a, b: a + b, numPartitions = n_partitions)

    # cache "master" deletes RDD, list of (deletes, [unique suggestions]), for use in loop
    deletes = combine.mapValues(lambda sl: list(set(sl))).cache()
    
    ##
    ## generate 2+ deletes
    ##
    
    d_remaining = max_edit_distance - 1  # decreasing counter
    queue = deletes

    while d_remaining>0:

        # generate further deletes
        #'expand_new_deletes' will be of the form [(parent "delete", [new child "deletes"]), ...]
        # n.b. this will filter out elements with no new child deletes
        gen_new_deletes = queue.map(lambda (x, y): (x, get_deletes_list(x)), preservesPartitioning=True)
        expand_new_deletes = gen_new_deletes.flatMapValues(lambda x: x)  

        # associate each new child delete with same corpus word suggestions that applied for parent delete
        # update queue with [(new child delete, [corpus suggestions]) ...] and cache for next iteration
        
        assert copartitioned(queue, expand_new_deletes)   # check partitioning for efficient join
        get_sugglist_from_parent = expand_new_deletes.join(queue)
        new_deletes = get_sugglist_from_parent.map(lambda (p, (c, sl)): (c, sl))
        combine_new = new_deletes.reduceByKey(lambda a, b: a + b, numPartitions = n_partitions)
        queue = combine_new.mapValues(lambda sl: list(set(sl))).cache()

        # update "master" deletes list with new deletes, and cache for next iteration
        
        assert copartitioned(deletes, queue)    # check partitioning for efficient join
        join_delete_lists = deletes.fullOuterJoin(queue)
        deletes = join_delete_lists.mapValues(lambda y: combine_joined_lists(y)).cache()

        d_remaining -= 1
        
    ############
    #
    # merge deletes with unique corpus words to construct main dictionary
    #
    ############

    # dictionary entries are in the form: (list of suggested corrections, frequency of word in corpus)
    # note frequency of word in corpus is not incremented for deletes
    deletes_for_dict = deletes.mapValues(lambda sl: (sl, 0)) 
    unique_words_for_dict = unique_words_with_count.mapValues(lambda count: ([], count))

    assert copartitioned(unique_words_for_dict, deletes_for_dict)  # check partitioning for efficient join
    join_deletes = unique_words_for_dict.fullOuterJoin(deletes_for_dict)
    '''
    entries now in form of (word, ( ([], count), ([suggestions], 0) )) for words in both corpus/deletes
                           (word, ( ([], count), None               )) for (real) words in corpus only
                           (word, ( None       , ([suggestions], 0) )) for (fake) words in deletes only
    '''

    # if entry has deletes and is a real word, take suggestion list from deletes and count from corpus
    dictionary_RDD = join_deletes.mapValues(lambda (xtup, ytup): 
                                                xtup if ytup is None
                                                else ytup if xtup is None
                                                else (ytup[0], xtup[1])).cache()

    print "total items in dictionary (corpus words and deletions): %i" % dictionary_RDD.count()
    print "  edit distance for deletions: %i" % max_edit_distance
    longest_word_length = unique_words_with_count.map(lambda (k, v): len(k)).reduce(max)
    print "  length of longest word in corpus: %i" % longest_word_length
        
    return dictionary_RDD, longest_word_length

In [12]:
%%time
#create_dictionary("/Users/K-Lo/Desktop/big.txt").collect()
a, lwl = parallel_create_dictionary("/Users/K-Lo/Desktop/big.txt")

Creating dictionary...
>>> processing corpus words...
total words processed: 1105285
total unique words in corpus: 29157
>>> processing deletions from corpus...
total items in dictionary (corpus words and deletions): 2151998
  edit distance for deletions: 3
  length of longest word in corpus: 18
CPU times: user 101 ms, sys: 26.3 ms, total: 127 ms
Wall time: 4min 41s


<div class="alert alert-info">
<b>For testing:</b> <p>
Can look up a specific entry in the dictionary below. <br>
shows (possible corrections, and frequency that entry itself is in corpus - 0 if not a real word)
</div>

In [None]:
%%time
a.lookup("the")

In [None]:
a.lookup("zzfftt")

In [None]:
a.take(2)

In [13]:
def dameraulevenshtein(seq1, seq2):
    """Calculate the Damerau-Levenshtein distance (an integer) between sequences (e.g. strings).

    Source: http://mwh.geek.nz/2009/04/26/python-damerau-levenshtein-distance/
    
    This distance is the number of additions, deletions, substitutions,
    and transpositions needed to transform the first sequence into the
    second. Although generally used with strings, any sequences of
    comparable objects will work.

    Transpositions are exchanges of *consecutive* characters; all other
    operations are self-explanatory.

    This implementation is O(N*M) time and O(M) space, for N and M the
    lengths of the two sequences.

    >>> dameraulevenshtein('ba', 'abc')
    2
    >>> dameraulevenshtein('fee', 'deed')
    2

    It works with arbitrary sequences too:
    >>> dameraulevenshtein('abcd', ['b', 'a', 'c', 'd', 'e'])
    2
    """
    # codesnippet:D0DE4716-B6E6-4161-9219-2903BF8F547F
    # Conceptually, this is based on a len(seq1) + 1 * len(seq2) + 1 matrix.
    # However, only the current and two previous rows are needed at once,
    # so we only store those.
    oneago = None
    thisrow = range(1, len(seq2) + 1) + [0]
    for x in xrange(len(seq1)):
        # Python lists wrap around for negative indices, so put the
        # leftmost column at the *end* of the list. This matches with
        # the zero-indexed strings and saves extra calculation.
        twoago, oneago, thisrow = oneago, thisrow, [0] * len(seq2) + [x + 1]
        for y in xrange(len(seq2)):
            delcost = oneago[y] + 1
            addcost = thisrow[y - 1] + 1
            subcost = oneago[y - 1] + (seq1[x] != seq2[y])
            thisrow[y] = min(delcost, addcost, subcost)
            # This block deals with transpositions
            if (x > 0 and y > 0 and seq1[x] == seq2[y - 1]
                and seq1[x-1] == seq2[y] and seq1[x] != seq2[y]):
                thisrow[y] = min(thisrow[y], twoago[y - 2] + 1)
    return thisrow[len(seq2) - 1]

In [14]:
def get_n_deletes_list(w, n):
    '''given a word, derive strings with up to n characters deleted'''
    deletes = []
    queue = [w]
    for d in range(n):
        temp_queue = []
        for word in queue:
            if len(word)>1:
                for c in range(len(word)):  # character index
                    word_minus_c = word[:c] + word[c+1:]
                    if word_minus_c not in deletes:
                        deletes.append(word_minus_c)
                    if word_minus_c not in temp_queue:
                        temp_queue.append(word_minus_c)
        queue = temp_queue
        
    return deletes

In [20]:
def parallel_get_suggestions(s, dictRDD, longest_word_length=float('inf'), silent=False):
    '''return list of suggested corrections for potentially incorrectly spelled word.
    
    s: input string
    dictRDD: the main dictionary, which includes deletes
             entries are in the form of: [(word, ([suggested corrections], frequency of word in corpus)), ...]
    longest_word_length: optional identifier of longest real word in dictRDD
    silent: verbose output
    '''
    
    if (len(s) - longest_word_length) > max_edit_distance:
        if not silent:
            print "no items in dictionary within maximum edit distance"
        return []

    ##########
    #
    # initialize suggestions RDD
    # suggestRDD entries: (word, (frequency of word in corpus, edit distance))
    #
    ##########
    
    if not silent:
        print ">>> looking up suggestions based on input word..."
    
    # ensure input RDDs are partitioned
    dictRDD = dictRDD.partitionBy(n_partitions).cache()
    
    # check if input word is in dictionary, and is a word from the corpus (edit distance = 0)
    # if so, add input word itself to suggestRDD
    exact_match = dictRDD.filter(lambda (w, (sl, freq)): w==s).cache()
    suggestRDD = exact_match.mapValues(lambda (sl, freq): (freq, 0)).cache()

    ##########
    #
    # add suggestions for input word
    #
    ##########
    
    # the suggested corrections for the item in dictionary (whether or not
    # the input string s itself is a valid word or merely a delete) can be valid corrections
    sc_items = exact_match.flatMap(lambda (w, (sl, freq)): sl)
    calc_dist = sc_items.map(lambda sc: (sc, len(sc)-len(s))).partitionBy(n_partitions).cache()
    
    assert copartitioned(dictRDD, calc_dist)  # check partitioning for efficient join
    get_freq = dictRDD.join(calc_dist)
    parent_sugg = get_freq.mapValues(lambda ((sl, freq), dist): (freq, dist))
    suggestRDD = suggestRDD.union(parent_sugg).cache()
    assert copartitioned(parent_sugg, suggestRDD)  # check partitioning

    ##########
    #
    # process deletes on the input string
    #
    ##########
     
    assert max_edit_distance>0
    if not silent:
        print ">>> processing deletions for input word..."
    
    list_deletes_of_s = sc.parallelize(get_n_deletes_list(s, max_edit_distance))
    deletes_of_s = list_deletes_of_s.map(lambda k: (k, 0)).partitionBy(n_partitions).cache()
    
    assert copartitioned(dictRDD, deletes_of_s) # check partitioning for efficient join
    check_matches = dictRDD.join(deletes_of_s).cache()
    
    # if delete is a real word in corpus, add it to suggestion list
    del_exact_match = check_matches.filter(lambda (w, ((sl, freq), _)): freq>0)
    del_sugg = del_exact_match.map(lambda (w, ((s1, freq), _)): (w, (freq, len(s)-len(w))),
                                   preservesPartitioning=True)
    suggestRDD = suggestRDD.union(del_sugg).cache()
    
    # the suggested corrections for the item in dictionary (whether or not
    # the delete itself is a valid word or merely a delete) can be valid corrections    
    list_sl = check_matches.mapValues(lambda ((sl, freq), _): sl).flatMapValues(lambda x: x)
    swap_del = list_sl.map(lambda (w, sc): (sc, 0))
    combine_del = swap_del.reduceByKey(lambda a, b: a + b, numPartitions = n_partitions).cache()

    # need to recalculate actual Deverau Levenshtein distance to be within max_edit_distance for all deletes
    calc_dist = combine_del.map(lambda (w, _): (w, dameraulevenshtein(s, w)),
                                       preservesPartitioning=True)
    filter_by_dist = calc_dist.filter(lambda (w, dist): dist<=max_edit_distance)
    
    # get frequencies from main dictionary and add to suggestions list
    assert copartitioned(dictRDD, filter_by_dist)  # check partitioning for efficient join
    get_freq = dictRDD.join(filter_by_dist)
    del_parent_sugg = get_freq.mapValues(lambda ((sl, freq), dist): (freq, dist))
    
    suggestRDD = suggestRDD.union(del_parent_sugg).distinct().cache()    
    
    if not silent:
        print "number of possible corrections: %i" %suggestRDD.count()
        print "  edit distance for deletions: %i" % max_edit_distance

    ##########
    #
    # sort RDD for output
    #
    ##########
    
    # suggest_RDD is in the form: [(word, (freq, editdist)), (word, (freq, editdist)), ...]
    # there does not seem to be a straightforward way to sort by both primary and secondary keys in Spark
    # this is a documented issue: one option is to simply work with a list since there are likely not
    # going to be an extremely large number of recommended suggestions
    
    output = suggestRDD.collect()
    
    # output option 1
    # sort results by ascending order of edit distance and descending order of frequency
    #     and return list of suggested corrections only:
    # return sorted(output, key = lambda x: (suggest_dict[x][1], -suggest_dict[x][0]))

    # output option 2
    # return list of suggestions with (correction, (frequency in corpus, edit distance)):
    # return sorted(output, key = lambda (term, (freq, dist)): (dist, -freq))

    return sorted(output, key = lambda (term, (freq, dist)): (dist, -freq))

In [21]:
%%time
parallel_get_suggestions("there", a, lwl)

>>> looking up suggestions based on input word...
>>> processing deletions for input word...
number of possible corrections: 604
  edit distance for deletions: 3
CPU times: user 82.2 ms, sys: 22.1 ms, total: 104 ms
Wall time: 1min 39s


[(u'there', (2972, 0)),
 (u'these', (1231, 1)),
 (u'where', (977, 1)),
 (u'here', (691, 1)),
 (u'three', (584, 1)),
 (u'thee', (26, 1)),
 (u'chere', (9, 1)),
 (u'theme', (8, 1)),
 (u'the', (80030, 2)),
 (u'her', (5284, 2)),
 (u'were', (4289, 2)),
 (u'they', (3938, 2)),
 (u'their', (2955, 2)),
 (u'them', (2241, 2)),
 (u'then', (1558, 2)),
 (u'other', (1502, 2)),
 (u'those', (1201, 2)),
 (u'others', (410, 2)),
 (u'third', (239, 2)),
 (u'term', (133, 2)),
 (u'threw', (96, 2)),
 (u'mere', (79, 2)),
 (u'theory', (79, 2)),
 (u'share', (69, 2)),
 (u'hero', (55, 2)),
 (u'tree', (42, 2)),
 (u'hare', (36, 2)),
 (u'thereby', (32, 2)),
 (u'sphere', (31, 2)),
 (u'hers', (30, 2)),
 (u'thereof', (26, 2)),
 (u'cher', (25, 2)),
 (u'tore', (18, 2)),
 (u'herd', (15, 2)),
 (u'theirs', (14, 2)),
 (u'thiers', (13, 2)),
 (u'shore', (11, 2)),
 (u'thence', (10, 2)),
 (u'tete', (9, 2)),
 (u'sheer', (8, 2)),
 (u'adhere', (8, 2)),
 (u'ether', (8, 2)),
 (u'tver', (7, 2)),
 (u'therein', (6, 2)),
 (u'tier', (5, 2)),