# Featured Activity

##### Setting up spark Context

We will be using 'Python' as the programming language for implementing our word co-occurrence problem in Apache spark. For that we need the 'pyspark' library which enables us to usepython for Spark programming.

In [30]:
import pyspark

In [31]:
sc = pyspark.SparkContext()

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=PySparkShell, master=local[*]) created by <module> at /home/arnav/anaconda2/lib/python2.7/site-packages/IPython/utils/py3compat.py:289 

##### Input

We will be taking inputs from a folder called 'Input'. So whatever files are present in this folder will be used as the input to our program.

In [95]:
textFile = sc.textFile("Input")
textFile=textFile.filter(lambda x: x is not u'')
# textFile is our RDD and we will extract all the required information from this RDD

##### Creating Dictionary for Lemmas' look up 

We will create a dictionary which is an equivalent of hash-map in Python. We will use this dictionary to perform our lookup operations for Lemmas.

In [96]:
import csv
import sys
mydict= dict()
with open("new_lemmatizer.csv") as csvfile:
    readCSV = csv.reader(csvfile, delimiter=',')
    for row in readCSV:
        temp = ""
        k= row[0]
        length = len(row)
        for i in range(1,length):
            if row[i] != "":
                temp += row[i]+" "
        mydict[k] = temp

##### Word-co-occurence Bi-grams (n=2)

Mentioned comments in the code enable us to understand the overall flow of our program.

In [97]:
import time

start_time = time.time()

def myMapper(line):
    splitLocationText=line.split(">")       # Split (<location> Text) as (<location || Text) 
    location=splitLocationText[0]+">"       # Now we get location as <location>
    finalLine=splitLocationText[1].strip()  # Trimming the line
    tokens=finalLine.split(" ")             # 1 token= 1 word in line
    keyValList=[]                           # To store the keyValues and locations
    for i in range(0,len(tokens)-1):
        normalizedWord=tokens[i].replace('j','i')
        normalizedWord=normalizedWord.replace('J','I')
        normalizedWord=normalizedWord.replace('v','u')
        normalizedWord=normalizedWord.replace('V','U') # Normalize words by substituting 'j' by 'i' and 'v' by 'u'
        if(mydict.has_key(normalizedWord)):        # dictionary contains word
            wordLemma=mydict.get(normalizedWord).strip()  # remove all extra spaces in strings
            wordLemma=wordLemma.split(" ")                  # we will get an array of lemmas for word
            for x in range(0,len(wordLemma)):          # for each lemma of word
                    for j in range(i+1,len(tokens)):
                        if(i==j):
                            continue
                        normalizedNeighbour=tokens[j].replace('j','i')
                        normalizedNeighbour=normalizedNeighbour.replace('J','I')
                        normalizedNeighbour=normalizedNeighbour.replace('v','u') # Normalize neighbour
                        normalizedNeighbour=normalizedNeighbour.replace('V','U')
                        if(mydict.has_key(normalizedNeighbour)):     # dictionary contains neighbour
                            neighbour1Lemma=mydict.get(normalizedNeighbour).strip()   # remove extra spaces
                            neighbour1Lemma=neighbour1Lemma.split(" ")
                            for y in range(0,len(neighbour1Lemma)):     # for each lemma of neighbour, append (lemma(normalizedWord),lemma(normalizedNeighbour):location)
                                finalKeyVal=wordLemma[x]+" "+neighbour1Lemma[y]
                                keyValList.append((finalKeyVal,location))
                        else:
                            finalKeyVal=wordLemma[x]+" "+normalizedNeighbour # if dictionary doesn't contain neighbour, append (lemma(normalizedWord),normalizedNeighbour:location)
                            keyValList.append((finalKeyVal,location))
                                        
        else:                                 # if no lemma for word and neighbour present, then append (normalizedWord,normalizedNeighbour:location)
            for j in range(i+1,len(tokens)):
                if(i==j):
                    continue
                normalizedNeighbour=tokens[j].replace('j','i')
                normalizedNeighbour=normalizedNeighbour.replace('J','I')
                normalizedNeighbour=normalizedNeighbour.replace('v','u') # Normalize neighbour
                normalizedNeighbour=normalizedNeighbour.replace('V','U')
                finalKeyVal=normalizedWord+" "+normalizedNeighbour
                keyValList.append((finalKeyVal,location))  # we create a tuple (key,val) and append this in our list
                        
    return keyValList  # return final list of all key-values

output=textFile.flatMap(myMapper)          # Mapper method called
output.saveAsTextFile("BiGramsMapperOutput")

output1=output.reduceByKey(lambda a,b:a+","+b)  # Reducer method called
output1.saveAsTextFile("BiGramsReducerOutput")


print("--- %s seconds ---" % (time.time() - start_time))

--- 7.95885300636 seconds ---


##### Word-co-occurence Tri-grams (n=3)

Mentioned comments in the code enable us to understand the overall flow of our program.

In [98]:
import time

start_time = time.time()

def myMapperForThreeWords(line):
    splitLocationText=line.split(">")       # Split (<location> Text) as (<location || Text) 
    location=splitLocationText[0]+">"       # Now we get location as <location>
    finalLine=splitLocationText[1].strip()  # Trimming the line
    tokens=finalLine.split(" ")             # 1 token= 1 word in line
    keyValList=[]                           # To store the keyValues and locations
    
    for i in range(0,len(tokens)-2):
        normalizedWord=tokens[i].replace('j','i')
        normalizedWord=normalizedWord.replace('J','I')
        normalizedWord=normalizedWord.replace('v','u')
        normalizedWord=normalizedWord.replace('V','U') # Normalize words by substituting 'j' by 'i' and 'v' by 'u'
        if(mydict.has_key(normalizedWord)):        # dictionary contains word
            wordLemma=mydict.get(normalizedWord).strip() # remove extra spaces
            wordLemma=wordLemma.split(" ") # we will get an array of lemmas for word
            for x in range(0,len(wordLemma)):
                for j in range(i+1,len(tokens)-1):
                    if(i==j):
                        continue
                    normalizedNeighbour=tokens[j].replace('j','i')
                    normalizedNeighbour=normalizedNeighbour.replace('J','I')
                    normalizedNeighbour=normalizedNeighbour.replace('v','u') # Normalize neighbour
                    normalizedNeighbour=normalizedNeighbour.replace('V','U')
                    if(mydict.has_key(normalizedNeighbour)):     # dictionary contains neighbour
                        neighbour1Lemma=mydict.get(normalizedNeighbour).strip()
                        neighbour1Lemma=neighbour1Lemma.split(" ")
                        for y in range(0,len(neighbour1Lemma)):
                            for k in range(j+1,len(tokens)):
                                normalizedNeighbour2=tokens[k].replace('j','i')
                                normalizedNeighbour2=normalizedNeighbour2.replace('J','I')
                                normalizedNeighbour2=normalizedNeighbour2.replace('v','u') # Normalize neighbour2
                                normalizedNeighbour2=normalizedNeighbour2.replace('V','U')
                                if(mydict.has_key(normalizedNeighbour2)):  # lemmas for neighbour2
                                    neighbour2Lemma=mydict.get(normalizedNeighbour2).strip()
                                    neighbour2Lemma=neighbour2Lemma.split(" ")
                                    for z in range(0,len(neighbour2Lemma)):
                                        finalKeyVal=wordLemma[x]+" "+neighbour1Lemma[y]+" "+neighbour2Lemma[z]    # append (lemma(normalizedWord),lemma(normalizedNeighbour1),lemma(normalizedNeighbour2):location)                   
                                        keyValList.append((finalKeyVal,location))
                                else:
                                    finalKeyVal=wordLemma[x]+" "+neighbour1Lemma[y]+" "+normalizedNeighbour2      # append (lemma(normalizedWord),lemma(normalizedNeighbour1),normalizedNeighbour2:location) 
                                    keyValList.append((finalKeyVal,location))
                    else:  # neighbour1 is not present in dictionary
                        for k in range(j+1,len(tokens)):
                            normalizedNeighbour2=tokens[k].replace('j','i')
                            normalizedNeighbour2=normalizedNeighbour2.replace('J','I')
                            normalizedNeighbour2=normalizedNeighbour2.replace('v','u') # Normalize neighbour2
                            normalizedNeighbour2=normalizedNeighbour2.replace('V','U')
                            if(mydict.has_key(normalizedNeighbour2)):
                                neighbour2Lemma=mydict.get(normalizedNeighbour2).strip()
                                neighbour2Lemma=neighbour2Lemma.split(" ")
                                for z in range(0,len(neighbour2Lemma)):
                                    finalKeyVal=wordLemma[x]+" "+normalizedNeighbour+" "+neighbour2Lemma[z]  # append (lemma(normalizedWord),normalizedNeighbour1,lemma(normalizedNeighbour2):location) 
                                    keyValList.append((finalKeyVal,location))
                            else:
                                finalKeyVal=wordLemma[x]+" "+normalizedNeighbour+" "+normalizedNeighbour2    # append (lemma(normalizedWord),normalizedNeighbour1,normalizedNeighbour2:location) 
                                keyValList.append((finalKeyVal,location))
        else:
            for j in range(i+1,len(tokens)-1):
                if(i==j):
                    continue
                normalizedNeighbour=tokens[j].replace('j','i')
                normalizedNeighbour=normalizedNeighbour.replace('J','I')
                normalizedNeighbour=normalizedNeighbour.replace('v','u') # Normalize neighbour
                normalizedNeighbour=normalizedNeighbour.replace('V','U')
                if(mydict.has_key(normalizedNeighbour)):
                    neighbour1Lemma=mydict.get(normalizedNeighbour).strip()
                    neighbour1Lemma=neighbour1Lemma.split(" ")
                    for y in range(0,len(neighbour1Lemma)):
                        for k in range(j+1,len(tokens)):
                            normalizedNeighbour2=tokens[k].replace('j','i')
                            normalizedNeighbour2=normalizedNeighbour2.replace('J','I')
                            normalizedNeighbour2=normalizedNeighbour2.replace('v','u') # Normalize neighbour2
                            normalizedNeighbour2=normalizedNeighbour2.replace('V','U')
                            if(mydict.has_key(normalizedNeighbour2)):
                                neighbour2Lemma=mydict.get(normalizedNeighbour2).strip()
                                neighbour2Lemma=neighbour2Lemma.split(" ")
                                for z in range(0,len(neighbour2Lemma)):
                                    finalKeyVal=normalizedWord+" "+neighbour1Lemma[y]+" "+neighbour2Lemma[z]  # append (normalizedWord,lemma(normalizedNeighbour1),lemma(normalizedNeighbour2):location) 
                                    keyValList.append((finalKeyVal,location))
                            else:
                                finalKeyVal=normalizedWord+" "+neighbour1Lemma[y]+" "+normalizedNeighbour2    # append (normalizedWord,lemma(normalizedNeighbour1),normalizedNeighbour2:location) 
                                keyValList.append((finalKeyVal,location))
                else:
                    for k in range(j+1,len(tokens)):
                        normalizedNeighbour2=tokens[k].replace('j','i')
                        normalizedNeighbour2=normalizedNeighbour2.replace('J','I')
                        normalizedNeighbour2=normalizedNeighbour2.replace('v','u') # Normalize neighbour2
                        normalizedNeighbour2=normalizedNeighbour2.replace('V','U')
                        if(mydict.has_key(normalizedNeighbour2)):
                            neighbour2Lemma=mydict.get(normalizedNeighbour2).strip()
                            neighbour2Lemma=neighbour2Lemma.split(" ")
                            for z in range(0,len(neighbour2Lemma)):
                                finalKeyVal=normalizedWord+" "+normalizedNeighbour+" "+neighbour2Lemma[z]    # append (normalizedWord,normalizedNeighbour1,lemma(normalizedNeighbour2):location) 
                                keyValList.append((finalKeyVal,location))
                        else:
                            finalKeyVal=normalizedWord+" "+normalizedNeighbour+" "+normalizedNeighbour2        # append (normalizedWord,normalizedNeighbour1,normalizedNeighbour2:location) 
                            keyValList.append((finalKeyVal,location))       
            
    return keyValList

output2=textFile.flatMap(myMapperForThreeWords)
output2.saveAsTextFile("TriGramsMapperOutput")
output3=output2.reduceByKey(lambda a,b:a+","+b)
output3.saveAsTextFile("TriGramsReducerOutput")

print("--- %s seconds ---" % (time.time() - start_time))

--- 24.6293430328 seconds ---
