In [118]:
from pyspark.sql import SparkSession
from operator import add
import pandas as pd
import string

spark_session = SparkSession\
        .builder\
        .master("spark://192.168.2.250:7077") \
        .appName("Nikitha_Krishna Murthy_lab 3")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.dynamicAllocation.shuffleTracking.enabled",True)\
        .config("spark.shuffle.service.enabled", False)\
        .config("spark.dynamicAllocation.executorIdleTimeout","300s")\
        .config("spark.executor.cores",2)\
        .config("spark.driver.port",9999)\
        .config("spark.blockManager.port",10005)\
        .getOrCreate()
spark_context = spark_session.sparkContext

spark_context.setLogLevel("ERROR")



In [119]:
#reading the english transcript lines for danish language and then counting number of lines
RDD_EN = spark_context.textFile("hdfs://192.168.2.250:9000/europarl/europarl-v7.da-en.en")
print(RDD_EN.count())



1968800


                                                                                

In [120]:
#reading the danish transcript lines for danish language and then counting number of lines
RDD_DA = spark_context.textFile("hdfs://192.168.2.250:9000/europarl/europarl-v7.da-en.da")
print(RDD_DA.count())



1968800


                                                                                

In [4]:
RDD_EN.getNumPartitions()#number of partitions for first RDD

3

In [11]:
RDD_DA.getNumPartitions()#number of partitions for first RDD


3

In [121]:
def tokenizer_lower(lines):
    X=lines.translate(str.maketrans('','',string.punctuation))
    return X.lower().split()

In [122]:
#printing first 10 lines preprocessed English transcript in lowercase and after split
preprocess_EN=RDD_EN.map(tokenizer_lower)
for line in preprocess_EN.take(10):
    print(line)

['resumption', 'of', 'the', 'session']
['i', 'declare', 'resumed', 'the', 'session', 'of', 'the', 'european', 'parliament', 'adjourned', 'on', 'friday', '17', 'december', '1999', 'and', 'i', 'would', 'like', 'once', 'again', 'to', 'wish', 'you', 'a', 'happy', 'new', 'year', 'in', 'the', 'hope', 'that', 'you', 'enjoyed', 'a', 'pleasant', 'festive', 'period']
['although', 'as', 'you', 'will', 'have', 'seen', 'the', 'dreaded', 'millennium', 'bug', 'failed', 'to', 'materialise', 'still', 'the', 'people', 'in', 'a', 'number', 'of', 'countries', 'suffered', 'a', 'series', 'of', 'natural', 'disasters', 'that', 'truly', 'were', 'dreadful']
['you', 'have', 'requested', 'a', 'debate', 'on', 'this', 'subject', 'in', 'the', 'course', 'of', 'the', 'next', 'few', 'days', 'during', 'this', 'partsession']
['in', 'the', 'meantime', 'i', 'should', 'like', 'to', 'observe', 'a', 'minute', 's', 'silence', 'as', 'a', 'number', 'of', 'members', 'have', 'requested', 'on', 'behalf', 'of', 'all', 'the', 'victim

                                                                                

In [123]:
#printing first 10 lines preprocessed Danish transcript in lowercase and after split
preprocess_DA=RDD_DA.map(tokenizer_lower)
for line in preprocess_DA.take(10):
    print(line)

[Stage 3:>                                                          (0 + 1) / 1]

['genoptagelse', 'af', 'sessionen']
['jeg', 'erklærer', 'europaparlamentets', 'session', 'der', 'blev', 'afbrudt', 'fredag', 'den', '17', 'december', 'for', 'genoptaget', 'endnu', 'en', 'gang', 'vil', 'jeg', 'ønske', 'dem', 'godt', 'nytår', 'og', 'jeg', 'håber', 'de', 'har', 'haft', 'en', 'god', 'ferie']
['som', 'de', 'kan', 'se', 'indfandt', 'det', 'store', 'år', '2000problem', 'sig', 'ikke', 'til', 'gengæld', 'har', 'borgerne', 'i', 'en', 'del', 'af', 'medlemslandene', 'været', 'ramt', 'af', 'meget', 'forfærdelige', 'naturkatastrofer']
['de', 'har', 'udtrykt', 'ønske', 'om', 'en', 'debat', 'om', 'dette', 'emne', 'i', 'løbet', 'af', 'mødeperioden']
['i', 'mellemtiden', 'ønsker', 'jeg', 'som', 'også', 'en', 'del', 'kolleger', 'har', 'anmodet', 'om', 'at', 'vi', 'iagttager', 'et', 'minuts', 'stilhed', 'til', 'minde', 'om', 'ofrene', 'for', 'bla', 'stormene', 'i', 'de', 'medlemslande', 'der', 'blev', 'ramt']
['jeg', 'opfordrer', 'dem', 'til', 'stående', 'at', 'iagttage', 'et', 'minuts', 

                                                                                

In [97]:
X=preprocess_EN.count() #counting number lines in EN preprocessed transcripts.
Y=preprocess_DA.count() #counting number lines in DA preprocessed transcripts.
print("line counts of preprocessed english transcript:",X)
print("line counts of preprocessed Danish transcript:",Y)
if X==Y:
    print("line count matches")
else:
    print("line count does not match")




line counts of preprocessed english transcript: 1968800
line counts of preprocessed Danish transcript: 1968800
line count matches




In [138]:
#function to count 10 frequently occuring words
def word_occurance(X):
    flat_X = X.flatMap(lambda i: i) #using flatmap to convert the words into single list
    map_X = flat_X.map(lambda i: (i,1))#to map words to number 1
    reduce_X = map_X.reduceByKey(add)#to count the words
    return reduce_X.takeOrdered(10,key=lambda i: -i[1]) #to print the words at top 10 occurences

In [139]:
##printing top 10 most frequently occured words in english transcript
Count_EN=word_occurance(preprocess_EN)
print("the 10 most frequently occured words in english transcript are :")
X=pd.DataFrame(Count_EN,columns=["word","count"])
print(X)



the 10 most frequently occured words in english transcript are :
   word    count
0   the  3729868
1    of  1766674
2    to  1640366
3   and  1401027
4    in  1157296
5  that   892029
6     a   823732
7    is   822269
8   for   569790
9    we   562591


                                                                                

In [126]:
#printing top 10 most frequently occured words in Danish transcript
Count_DA=word_occurance(preprocess_DA)
print("the 10 most frequently occured words in Danish transcript are :")
X=pd.DataFrame(Count_DA,columns=["word","count"])
print(X)



the 10 most frequently occured words in Danish transcript are :
  word    count
0   at  1549339
1   og  1437864
2    i  1274078
3   er  1057081
4  det   992398
5  for   980902
6   af   927153
7  til   786471
8   en   688735
9   de   675299


                                                                                

In [127]:
EN_1=preprocess_EN.zipWithIndex()
print(EN_1.take(5))
DA_1=preprocess_DA.zipWithIndex()
print(DA_1.take(5))



                                                                                

[(['resumption', 'of', 'the', 'session'], 0), (['i', 'declare', 'resumed', 'the', 'session', 'of', 'the', 'european', 'parliament', 'adjourned', 'on', 'friday', '17', 'december', '1999', 'and', 'i', 'would', 'like', 'once', 'again', 'to', 'wish', 'you', 'a', 'happy', 'new', 'year', 'in', 'the', 'hope', 'that', 'you', 'enjoyed', 'a', 'pleasant', 'festive', 'period'], 1), (['although', 'as', 'you', 'will', 'have', 'seen', 'the', 'dreaded', 'millennium', 'bug', 'failed', 'to', 'materialise', 'still', 'the', 'people', 'in', 'a', 'number', 'of', 'countries', 'suffered', 'a', 'series', 'of', 'natural', 'disasters', 'that', 'truly', 'were', 'dreadful'], 2), (['you', 'have', 'requested', 'a', 'debate', 'on', 'this', 'subject', 'in', 'the', 'course', 'of', 'the', 'next', 'few', 'days', 'during', 'this', 'partsession'], 3), (['in', 'the', 'meantime', 'i', 'should', 'like', 'to', 'observe', 'a', 'minute', 's', 'silence', 'as', 'a', 'number', 'of', 'members', 'have', 'requested', 'on', 'behalf', '

[Stage 11:>                                                         (0 + 1) / 1]

[(['genoptagelse', 'af', 'sessionen'], 0), (['jeg', 'erklærer', 'europaparlamentets', 'session', 'der', 'blev', 'afbrudt', 'fredag', 'den', '17', 'december', 'for', 'genoptaget', 'endnu', 'en', 'gang', 'vil', 'jeg', 'ønske', 'dem', 'godt', 'nytår', 'og', 'jeg', 'håber', 'de', 'har', 'haft', 'en', 'god', 'ferie'], 1), (['som', 'de', 'kan', 'se', 'indfandt', 'det', 'store', 'år', '2000problem', 'sig', 'ikke', 'til', 'gengæld', 'har', 'borgerne', 'i', 'en', 'del', 'af', 'medlemslandene', 'været', 'ramt', 'af', 'meget', 'forfærdelige', 'naturkatastrofer'], 2), (['de', 'har', 'udtrykt', 'ønske', 'om', 'en', 'debat', 'om', 'dette', 'emne', 'i', 'løbet', 'af', 'mødeperioden'], 3), (['i', 'mellemtiden', 'ønsker', 'jeg', 'som', 'også', 'en', 'del', 'kolleger', 'har', 'anmodet', 'om', 'at', 'vi', 'iagttager', 'et', 'minuts', 'stilhed', 'til', 'minde', 'om', 'ofrene', 'for', 'bla', 'stormene', 'i', 'de', 'medlemslande', 'der', 'blev', 'ramt'], 4)]


                                                                                

In [128]:
EN_2=EN_1.map(lambda i: (i[1],i[0]))
DA_2=DA_1.map(lambda i: (i[1],i[0]))
print(EN_2.take(5),DA_2.take(5))

[Stage 13:>                                                         (0 + 1) / 1]

[(0, ['resumption', 'of', 'the', 'session']), (1, ['i', 'declare', 'resumed', 'the', 'session', 'of', 'the', 'european', 'parliament', 'adjourned', 'on', 'friday', '17', 'december', '1999', 'and', 'i', 'would', 'like', 'once', 'again', 'to', 'wish', 'you', 'a', 'happy', 'new', 'year', 'in', 'the', 'hope', 'that', 'you', 'enjoyed', 'a', 'pleasant', 'festive', 'period']), (2, ['although', 'as', 'you', 'will', 'have', 'seen', 'the', 'dreaded', 'millennium', 'bug', 'failed', 'to', 'materialise', 'still', 'the', 'people', 'in', 'a', 'number', 'of', 'countries', 'suffered', 'a', 'series', 'of', 'natural', 'disasters', 'that', 'truly', 'were', 'dreadful']), (3, ['you', 'have', 'requested', 'a', 'debate', 'on', 'this', 'subject', 'in', 'the', 'course', 'of', 'the', 'next', 'few', 'days', 'during', 'this', 'partsession']), (4, ['in', 'the', 'meantime', 'i', 'should', 'like', 'to', 'observe', 'a', 'minute', 's', 'silence', 'as', 'a', 'number', 'of', 'members', 'have', 'requested', 'on', 'behalf'

                                                                                

In [129]:
ENDA_transcript=EN_2.join(DA_2)
print(ENDA_transcript.take(5))

[Stage 15:>                                                         (0 + 1) / 1]

[(4740, (['the', 'debate', 'is', 'closed'], ['forhandlingen', 'er', 'afsluttet'])), (7026, (['again', 'because', 'these', 'conciliations', 'are', 'not', 'entirely', 'straight', 'forward', 'we', 'have', 'to', 'thank', 'vicepresident', 'imbeni', 'for', 'his', 'exemplary', 'negotiating', 'skills', 'in', 'the', 'convoluted', 'field', 'of', 'ensuring', 'that', 'cultural', 'expenditure', 'is', 'wisely', 'deployed', 'and', 'given', 'to', 'us', 'in', 'our', 'community'], ['da', 'disse', 'forligsprocedurer', 'ikke', 'er', 'så', 'enkle', 'må', 'vi', 'endnu', 'en', 'gang', 'takke', 'næstformanden', 'hr', 'imbeni', 'for', 'hans', 'eksemplariske', 'forhandlingstalent', 'til', 'sikring', 'af', 'en', 'fornuftig', 'fordeling', 'af', 'kulturudgifterne', 'i', 'fællesskabet'])), (9168, (['nevertheless', 'we', 'definitely', 'have', 'to', 'do', 'more'], ['men', 'vi', 'skal', 'så', 'afgjort', 'gøre', 'mere', 'endnu'])), (15270, (['so', 'it', 'is', 'a', 'step', 'forward'], ['der', 'er', 'således', 'tale', 'o

                                                                                

In [130]:
ENDA_filter1=ENDA_transcript.filter(lambda i: i[1][0]!='' and i[1][1]!='')
print(ENDA_filter1.take(5))


[Stage 17:>                                                         (0 + 1) / 1]

[(912, (['the', 'rules', 'pertaining', 'to', 'the', 'steel', 'industry', 'were', 'drawn', 'up', 'on', '18', 'december', '1996'], ['for', 'stålindustriens', 'vedkommende', 'blev', 'de', 'gældende', 'regler', 'udarbejdet', 'den', '18', 'december', '1996'])), (1080, (['first', 'of', 'all', 'there', 'is', 'a', 'risk', 'that', 'the', 'decentralisation', 'of', 'powers', 'though', 'necessary', 'in', 'many', 'ways', 'will', 'cause', 'an', 'abnormal', 'increase', 'in', 'competitionrelated', 'initiatives', 'and', 'that', 'some', 'people', 'will', 'be', 'tempted', 'to', 'use', 'competition', 'law', 'not', 'as', 'a', 'means', 'to', 'be', 'resorted', 'to', 'when', 'all', 'else', 'fails', 'of', 'ensuring', 'the', 'smooth', 'and', 'predictable', 'functioning', 'of', 'the', 'markets', 'but', 'for', 'the', 'purposes', 'of', 'as', 'an', 'instrument', 'for', 'economic', 'and', 'industrial', 'policy', 'planning', 'and', 'interference', 'with', 'the', 'natural', 'workings', 'of', 'the', 'markets', 'themsel

                                                                                

In [131]:
ENDA_filter2=ENDA_filter1.filter(lambda i: len(i[1][0])<=15 and len(i[1][1])<=15)
print(ENDA_filter2.take(5))

[Stage 19:>                                                         (0 + 1) / 1]

[(72420, ([], ['2100'])), (97836, (['we', 'are', 'not', 'getting', 'into', 'a', 'debate', 'on', 'a', 'procedural', 'point'], ['vi', 'skal', 'ikke', 'til', 'at', 'starte', 'en', 'debat', 'om', 'et', 'procedurespørgsmål'])), (128598, (['that', 'is', 'why', 'i', 'would', 'like', 'to', 'concentrate', 'on', 'a', 'couple', 'of', 'key', 'areas'], ['derfor', 'vil', 'jeg', 'koncentrere', 'mig', 'om', 'et', 'par', 'væsentlige', 'punkter'])), (183090, (['that', 'concludes', 'questions', 'to', 'the', 'council'], ['hermed', 'er', 'spørgetiden', 'til', 'rådet', 'afsluttet'])), (193620, (['words', 'have', 'meaning'], ['ordene', 'har', 'en', 'betydning']))]


                                                                                

In [107]:
ENDA_filter3=ENDA_filter1.filter(lambda i: len(i[1][0]) == len(i[1][1]))
print(ENDA_filter3.take(5))




[(30546, (['thank', 'you', 'commissioner'], ['tak', 'hr', 'kommissær'])), (134118, (['i', 'am', 'emphatically', 'supporting', 'mrs', 'sanders', 'liability', 'proposals', 'and', 'also', 'the', 'proposals', 'regarding', 'consumer', 'information'], ['jeg', 'vil', 'udtrykkeligt', 'støtte', 'fru', 'sandersten', 'holtes', 'forslag', 'om', 'erstatningsansvar', 'og', 'også', 'forslagene', 'om', 'forbrugerinformation'])), (158322, (['question', 'no', '7', 'by', 'h048301'], ['spørgsmål', 'nr', '7', 'af', 'h048301'])), (225114, (['president'], ['formanden'])), (233874, (['the', 'laeken', 'declaration', 'contains', 'more', 'than', 'sixty', 'nonrhetorical', 'questions'], ['fru', 'formand', 'laekenerklæringen', 'indeholder', 'mere', 'end', '60', 'ikkeretoriske', 'spørgsmål']))]




In [137]:
ENDA_filter4 = ENDA_filter1.map(lambda i: list(zip(i[1][0],i[1][1])))
print(ENDA_filter4.take(5))

[Stage 35:>                                                         (0 + 1) / 1]

[[('only', 'først'), ('when', 'når'), ('an', 'et'), ('independent', 'uafhængigt'), ('judiciary', 'domstolsvæsen'), ('freedom', 'ytringsfrihed'), ('of', 'og'), ('expression', 'demokrati'), ('and', 'eksisterer'), ('democracy', 'som'), ('exist', 'andet'), ('as', 'end'), ('more', 'sound'), ('than', 'bites'), ('sound', 'og'), ('bites', 'når'), ('and', 'journalister'), ('when', 'oppositionspartier'), ('journalists', 'og'), ('opposition', 'ngoer'), ('parties', 'kan'), ('and', 'virke'), ('ngos', 'uden'), ('can', 'frygt'), ('operate', 'for'), ('without', 'straf'), ('fear', 'kan'), ('of', 'europa'), ('retribution', 'stille'), ('can', 'sig'), ('europe', 'solidarisk'), ('stand', 'med'), ('in', 'rusland')], [('we', 'vi'), ('intend', 'vil'), ('to', 'træffe'), ('take', 'de'), ('the', 'relevante'), ('relevant', 'beslutninger'), ('decisions', 'på'), ('in', 'dette'), ('this', 'område'), ('area', 'i'), ('in', 'december')], [('it', 'det'), ('ensures', 'sikrer'), ('that', 'at'), ('airbus', 'airbus'), ('doe

                                                                                

In [140]:
red_occurance = word_occurance(ENDA_filter4)
print("the most frequently occured words in English and Danish transcript are :")
X=pd.DataFrame(red_occurance,columns=["word","count"])
print(X)



the most frequently occured words in English and Danish transcript are :
         word   count
0    (i, jeg)  234932
1   (and, og)  194271
2    (we, vi)  182649
3    (is, er)  172577
4  (that, at)  141641
5     (in, i)  128235
6    (to, at)  127938
7  (the, den)  112369
8   (it, det)  112076
9   (the, at)  105105


                                                                                

In [141]:
spark_context.stop()