In [108]:
import pyspark
from pyspark import SparkContext

In [2]:
sc = SparkContext(appName="task6")

In [5]:
INPUT_FILE_PATH = "Rec1_F19_Files_to_zip/file01_Hd_Sp_Freq"

# MapReduce Functions

## Procedure:

#### The problem will be broken down into three phases which are explained below:

Given document will be broken down line by line with spark. <br>
There will be two identical RDDs created from the document.<br>
The two RDDs will be used to determine line numbers. One RDD (RDD_REF) <br>
will simply be turned into a list, with each element being a line<br>
this way the index will correspond to line number. The other line<br>
will be mapped to its line index using RDD_REF (see mapLineWithIndex() function below). <br>
Now we have lines with their corresponding line number which will allow us to rebuild<br>
the document as it was and without duplicates.

The next phase is to split each line's words and get rid of duplicates. flatMap will <br>
be used to get every word in the document with both their line number and position<br>
in that line. After which duplicates will be removed using series of group and map<br>
operations (see RDD operation chain and respective functions). Only non-duplicates will<br>
be present in the RDD now, each with their line and position within the line.

All the words will be grouped and sorted by line index. All words,positionInLine pairs<br>
will be grouped in their respective lines. A final reducer will create a single string<br>
that represents the recreated document without any duplicates, where each word will <br>
retain its original position in the line.

In [114]:
r"""

Code Organization:

    Section_1 : Create RDDs and create lines list
    Section_2 : Define Functions to be used by RDD operations like map
    Section_3 : RDD function chain to obtain ouptut
    Section_4 : Process output to be printed to a file

"""

# Section_1:
lines = sc.textFile(INPUT_FILE_PATH)
linesRef = sc.textFile(INPUT_FILE_PATH)
linesRef = linesRef.collect()


# Section_2: Most Functions contain documentation explaining their tasks
def getRidOfDupes(termTuple):
    r"""
        Gets rid of duplicate words while retaining the first occurrence.
    
    @param:
        termTuple: contains the word and the line and inLine index
            - [0] : word
            - [1] : list((line,inLine))

    @return:
        tuple with word and indicies of is first occurrence
            - [0] : word
            - [1] : (line,inLine)

    @Procedure:
        Go through list of (line,inLine) for the given word and then keep the
        tuples that have the lowest "line" value. After that keep only the
        element with the lowest "inLine" value. Then return the word with
        firstLine and firstInLinePosition.

    """
    key = termTuple[0]
    occurrences =  list(map(lambda x: (x[0],x[1]),termTuple[1])) # copy over the termTuple list to local list; couldn't process otherwise
    termIndices = []

    #get all of the line values for each word into a single list to get the minimum value
    for i in occurrences:
        termIndices.append(i[0])
    firstLineOccurrence = min(termIndices)
    
    #get all of the inLine values belonging to the minimum line into one list to get minimum
    inLineOccurrences = []
    for j in occurrences:
        if j[0] == firstLineOccurrence :
            inLineOccurrences.append(j[1])
    firstInLineOccurrence = min(inLineOccurrences)

    return key,(firstLineOccurrence,firstInLineOccurrence)


def mapLineWithLineIndex(line):
    r"""
        Get Index for each line.

    @param:
        line : line from document

    @return:
        tuple : (line,lineIndex)

    @Procedure:
        Give each line its index, position in the document.
        This is done by using an already existing list with
        each line, the index of that list serving as the 
        line index. The input line is checked against each
        line in the list and if there is a match then that
        index is used for the input line.

    """
    for i in range(len(linesRef)):
        if line == linesRef[i]:
            return (line,i)

def splitLineWithLineIndex(line):
    r"""
        "line" here is a tuple with two elements.
        line[0] = the actual line
        line[1] = index of the line
    """
    tokenized = line[0].split(" ")
    outputList = []
    for i in range(len(tokenized)):
        token = tokenized[i]
        outputList.append((token.lower(),(line[1],i)))
    return outputList

def reAlignByLines(token):
    r"""
    @param:
        token:
            - [0] : word
            - [1] : tuple with line and inLine position
                -[0] : line index
                -[1] : in linke index
    @return:
        tuple: (line,(word,positionInLine))
    """
    word = token[0]
    line = token[1][0]
    postitionInLine = token[1][1]
    return line,(word,postitionInLine)

def mergeLineContent(line):
    r"""
        Combines everyword for a single line into a single list

    @param:
        "line" here is the following:
            - line[0] = the line number
            - line[1] = iterator that has elements: tuple(word,positionInLine)
                - in the following function x[0] = word and x[1]= positionInLine
    @return:
        tuple: (lineNumber,listOfSortedList)
    """
    lineNum = line[0]
    wordList = list(map(lambda x: (x[0],x[1]),line[1]))
    wordList.sort(key=lambda x: x[1])
    sortedWordList = list(map(lambda kvPair: kvPair[0],wordList))
    return lineNum,sortedWordList

def reduceToSolution(acc,current):
    r"""
        Reduces RDD to single string that has answer.
    """
    stringList_acc = acc[1]
    stringList_acc.append('\n')
    stringList_current = current[1]
    for i in stringList_current:
        stringList_acc.append(i)
    return acc


# Section_3 :
r"""
    Each line has a separate RDD operation to process data.
    Some of these functions are passed a predefined function
    as input instead of lambda. The RDD operations and their
    purpose are as follows:

        -map        -> Uses: mapLineWithLineIndex, map line to its index (position in document) -> (line,lineIndex)
 
        -flatMap    -> Uses: splitLineWithLineIndex, tokenizes each line and creates a list of words with the line index and position in line -> list(word,(lineIndex,                       position))
        
        -groupByKey -> Groups by word, so we can detect duplicates -> word,list((lineIndex,positionInLine))
        
        -map        -> Uses: getRidOfDupes(), Gets rid of duplicate words while retaining the 1st occurrence -> word,(lineIndex,positionInLine) (1st occurrence for                          line and inLine)
        
        -map        -> Uses: reAlignByLines(), Change key value to make line the key -> (lineIndex,(word,positionInLine))
        
        -groupByKey -> Groups all word to their respective line -> (lineIndex,list((word,positionInLine),...))
        
        -sortByKey  -> Sorts the lines by index so lines that appear first will be first in RDD
        
        -map        -> Uses: mergeLineContent(), Merges all words that belong to each line retaining the order of the words in the line-> (lineIndex,list                                    (wordsInOrder))
        
        -reduce     -> Uses: reduceToSolution(), Combines all lines and the words within each line; a '\n' placed between each line so when printed to txt doc there                         will be a line break -> String
"""
noDupes = lines.map(lambda line: mapLineWithLineIndex(line))\
                .flatMap(splitLineWithLineIndex)\
                .groupByKey()\
                .map(getRidOfDupes)\
                .map(reAlignByLines)\
                .groupByKey()\
                .sortByKey()\
                .map(mergeLineContent)\
                .reduce(reduceToSolution)

# Section_4 : 
noDupesRdd = sc.parallelize(noDupes[1])
noDupesRdd.saveAsTextFile("Problem06_NoDuplicates_Output")
print(noDupesRdd.collect())

['hello', 'world', 'bye', 'how', 'are', 'you', 'what', 'is', 'your', 'name', '', '\n', 'i', 'study', 'spark', 'and', 'also', 'hadoop', '\n', 'prefer', 'to', '\n', 'but', 'have', 'spend', 'more', 'hours', 'studying', '\n', 'difficult', 'compile', 'debug', 'some', 'say', '\n', 'am', 'interested', 'in', 'marreduce', 'which', 'part', 'of', '\n', 'so', 'shall', 'choose', '\n', 'perhaps', 'can', 'compromise', "let's", 'forget', 'about', 'it', 'all', 'do', 'sql', '\n', 'cloud', 'computing', '\n', 'supposed', 'get', 'familiar', 'with', 'nosql', 'pighive', '\n', 'much', '\n', 'oh', 'it.', "i'll", 'only', 'as', 'this', 'started', '\n', 'no,', "don't", 'want', 'be', 'modern?', 'then', "i'd", 'better']


# Output

In [115]:
#format the output to match the input format:
outptuString = ""
for token in noDupesRdd.collect():
    outptuString += token + " "
print(outptuString)

hello world bye how are you what is your name  
 i study spark and also hadoop 
 prefer to 
 but have spend more hours studying 
 difficult compile debug some say 
 am interested in marreduce which part of 
 so shall choose 
 perhaps can compromise let's forget about it all do sql 
 cloud computing 
 supposed get familiar with nosql pighive 
 much 
 oh it. i'll only as this started 
 no, don't want be modern? then i'd better 


In [116]:
sc.stop()