In [2]:
import pyspark
from pyspark.sql import SparkSession
from operator import add
import string
import re

In [4]:
spark_session = SparkSession\
        .builder\
        .master("spark://192.168.2.119:7077") \
        .appName("weilin_PartA")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.dynamicAllocation.shuffleTracking.enabled",True)\
        .config("spark.shuffle.service.enabled", False)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores",2)\
        .config("spark.driver.port",9998)\
        .config("spark.blockManager.port",10005)\
        .getOrCreate()
# Old API (RDD)
spark_context = spark_session.sparkContext
spark_context.setLogLevel("ERROR")

In [5]:
spark_context

# A.1

In [7]:
English = spark_context.textFile("hdfs://192.168.2.119:9000/europarl/europarl-v7.sv-en.en")

English_lines = English.count()
print("The line counts of English: ", English_lines)

[Stage 0:>                                                          (0 + 2) / 2]

The line counts of English:  1862234


                                                                                

In [8]:
Swedish = spark_context.textFile("hdfs://192.168.2.119:9000/europarl/europarl-v7.sv-en.sv")

Swedish_lines = Swedish.count()
print("The line counts of Swedish: ", Swedish_lines)



The line counts of Swedish:  1862234


                                                                                

In [9]:
if English_lines == Swedish_lines:
    print("The line counts of English and the line counts of Swedish are the same")
else:
    print("The line counts of English and the line counts of Swedish are not the same")

The line counts of English and the line counts of Swedish are the same


In [27]:
Eng_partitions = English.getNumPartitions()
print("The number of partitions for English file",Eng_partitions)
Swe_partitions = Swedish.getNumPartitions()
print("The number of partitions for Swedish file",Swe_partitions)

The number of partitions for English file 2
The number of partitions for Swedish file 3


# A.2

In [11]:
def PreProcess(lines):
    # Lowercase the text
    lines = lines.lower()

    # Tokenize the text
    # lines = lines.translate(str.maketrans('', '', string.punctuation)).split()
    lines = re.compile('\w+').findall(lines)
    return lines

In [12]:
Eng_Latest = English.map(PreProcess)
print("English Lowercase:\n", Eng_Latest.first())

Swe_Latest = Swedish.map(PreProcess)
print("Swedish Lowercase:\n", Swe_Latest.first())

English Lowercase:
 ['resumption', 'of', 'the', 'session']
Swedish Lowercase:
 ['återupptagande', 'av', 'sessionen']


In [13]:
Eng_lines_counts = Eng_Latest.count()
Swe_lines_counts = Swe_Latest.count()

print("The line counts of English Lowercase:", Eng_lines_counts)
print("The line counts of Swedish Lowercase:", Swe_lines_counts)



The line counts of English Lowercase: 1862234
The line counts of Swedish Lowercase: 1862234


                                                                                

# A.3

In [14]:
# mapper the words 
Eng_map = English.flatMap(PreProcess).map(lambda x: (x,1))
print(Eng_map.take(10))

Swe_map = Swedish.flatMap(PreProcess).map(lambda x: (x,1))
print(Swe_map.take(10))

[('resumption', 1), ('of', 1), ('the', 1), ('session', 1), ('i', 1), ('declare', 1), ('resumed', 1), ('the', 1), ('session', 1), ('of', 1)]
[('återupptagande', 1), ('av', 1), ('sessionen', 1), ('jag', 1), ('förklarar', 1), ('europaparlamentets', 1), ('session', 1), ('återupptagen', 1), ('efter', 1), ('avbrottet', 1)]


In [15]:
Eng_freq = Eng_map.reduceByKey(lambda x,y:x+y).sortBy(lambda x:x[1], False).take(10)
Swe_freq = Swe_map.reduceByKey(lambda x,y:x+y).sortBy(lambda x:x[1], False).take(10)

                                                                                

In [16]:
print("10 most frequent words in English:\n", Eng_freq)
print("10 most frequent words in Swedish:\n", Swe_freq)

10 most frequent words in English:
 [('the', 3506102), ('of', 1662891), ('to', 1545280), ('and', 1320053), ('in', 1099408), ('that', 839196), ('a', 776918), ('is', 774969), ('for', 538487), ('we', 526580)]
10 most frequent words in Swedish:
 [('att', 1709969), ('och', 1351079), ('i', 1054764), ('det', 953045), ('som', 917621), ('för', 915166), ('av', 740770), ('är', 701851), ('en', 636939), ('vi', 546126)]


# A.4

In [28]:
# Key the lines by their line number
en_1 = Eng_Latest.zipWithIndex()
sv_1 = Swe_Latest.zipWithIndex()

print(en_1.take(3))
print(sv_1.take(3))

                                                                                

[(['resumption', 'of', 'the', 'session'], 0), (['i', 'declare', 'resumed', 'the', 'session', 'of', 'the', 'european', 'parliament', 'adjourned', 'on', 'friday', '17', 'december', '1999', 'and', 'i', 'would', 'like', 'once', 'again', 'to', 'wish', 'you', 'a', 'happy', 'new', 'year', 'in', 'the', 'hope', 'that', 'you', 'enjoyed', 'a', 'pleasant', 'festive', 'period'], 1), (['although', 'as', 'you', 'will', 'have', 'seen', 'the', 'dreaded', 'millennium', 'bug', 'failed', 'to', 'materialise', 'still', 'the', 'people', 'in', 'a', 'number', 'of', 'countries', 'suffered', 'a', 'series', 'of', 'natural', 'disasters', 'that', 'truly', 'were', 'dreadful'], 2)]
[(['återupptagande', 'av', 'sessionen'], 0), (['jag', 'förklarar', 'europaparlamentets', 'session', 'återupptagen', 'efter', 'avbrottet', 'den', '17', 'december', 'jag', 'vill', 'på', 'nytt', 'önska', 'er', 'ett', 'gott', 'nytt', 'år', 'och', 'jag', 'hoppas', 'att', 'ni', 'haft', 'en', 'trevlig', 'semester'], 1), (['som', 'ni', 'kunnat', '

In [18]:
# Swap the key and value - so that the line number is the key
en_2 = en_1.map(lambda x:(x[1],x[0]))
sv_2 = sv_1.map(lambda x:(x[1],x[0]))

print(en_2.take(3))
print(sv_2.take(3))

[(0, ['resumption', 'of', 'the', 'session']), (1, ['i', 'declare', 'resumed', 'the', 'session', 'of', 'the', 'european', 'parliament', 'adjourned', 'on', 'friday', '17', 'december', '1999', 'and', 'i', 'would', 'like', 'once', 'again', 'to', 'wish', 'you', 'a', 'happy', 'new', 'year', 'in', 'the', 'hope', 'that', 'you', 'enjoyed', 'a', 'pleasant', 'festive', 'period']), (2, ['although', 'as', 'you', 'will', 'have', 'seen', 'the', 'dreaded', 'millennium', 'bug', 'failed', 'to', 'materialise', 'still', 'the', 'people', 'in', 'a', 'number', 'of', 'countries', 'suffered', 'a', 'series', 'of', 'natural', 'disasters', 'that', 'truly', 'were', 'dreadful'])]
[(0, ['återupptagande', 'av', 'sessionen']), (1, ['jag', 'förklarar', 'europaparlamentets', 'session', 'återupptagen', 'efter', 'avbrottet', 'den', '17', 'december', 'jag', 'vill', 'på', 'nytt', 'önska', 'er', 'ett', 'gott', 'nytt', 'år', 'och', 'jag', 'hoppas', 'att', 'ni', 'haft', 'en', 'trevlig', 'semester']), (2, ['som', 'ni', 'kunnat'

In [29]:
# Join the two RDDs together according to the line number key
join_on_en_sv = en_2.join(sv_2)
print(join_on_en_sv.count())
print(join_on_en_sv.take(3))

                                                                                

1862234


[Stage 57:>                                                         (0 + 1) / 1]

[(909040, (['the', 'commission', 'will', 'then', 'propose', 'to', 'adhere', 'to', 'the', 'relevant', 'unece', 'regulation', 'with', 'revised', 'limit', 'values'], ['kommissionen', 'kommer', 'sedan', 'att', 'föreslå', 'att', 'man', 'följer', 'de', 'relevanta', 'ece', 'föreskrifterna', 'med', 'reviderade', 'gränsvärden'])), (909580, (['i', 'would', 'like', 'to', 'mention', 'the', 'fact', 'that', 'the', 'directive', 'we', 'refer', 'to', 'offers', 'the', 'possibility', 'to', 'the', 'member', 'states', 'to', 'introduce', 'a', 'road', 'tax', 'and', 'for', 'its', 'calculation', 'to', 'take', 'into', 'account', 'more', 'cost', 'elements', 'than', 'before'], ['jag', 'vill', 'nämna', 'att', 'det', 'direktiv', 'som', 'vi', 'hänvisar', 'till', 'ger', 'medlemsstaterna', 'möjlighet', 'att', 'införa', 'en', 'vägskatt', 'och', 'för', 'att', 'kunna', 'beräkna', 'denna', 'får', 'de', 'en', 'möjlighet', 'att', 'beakta', 'fler', 'kostnadsfaktorer', 'än', 'tidigare'])), (919805, (['we', 'will', 'set', 'our

                                                                                

In [30]:
# Filter to exclude line pairs that have an empty/missing “corresponding” sentence
en_sv_4 = join_on_en_sv.filter(lambda x: x[1][0] != [''] and x[1][1] != [''])

print(en_sv_4.take(3))

[Stage 59:>                                                         (0 + 1) / 1]

[(140, (['the', 'report', 'looks', 'at', 'the', 'issue', 'of', 'harmonising', 'the', 'examination', 'requirements', 'for', 'safety', 'advisors', 'working', 'in', 'the', 'areas', 'of', 'transportation', 'of', 'dangerous', 'goods', 'by', 'road', 'rail', 'and', 'inland', 'waterway'], ['han', 'behandlar', 'frågan', 'om', 'harmonisering', 'av', 'examineringskraven', 'för', 'säkerhetsrådgivare', 'för', 'transport', 'av', 'farligt', 'gods', 'på', 'väg', 'järnväg', 'och', 'inre', 'vattenvägar'])), (445, (['mr', 'president', 'mrs', 'schroedter', 's', 'report', 'undoubtedly', 'contains', 'several', 'important', 'observations', 'and', 'i', 'would', 'like', 'to', 'congratulate', 'her', 'on', 'that'], ['herr', 'talman', 'schroedters', 'betänkande', 'innehåller', 'utan', 'tvivel', 'ganska', 'många', 'viktiga', 'iakttagelser', 'och', 'därför', 'vill', 'jag', 'gratulera', 'henne'])), (1770, (['finally', 'after', 'seattle', 'do', 'you', 'really', 'think', 'you', 'can', 'advocate', 'increasing', 'the', 

                                                                                

In [31]:
# Filter to leave only pairs of sentences with less than 15 number of words per sentence, this should give a more reliable translation
en_sv_5 = en_sv_4.filter(lambda x: len(x[1][0]) <= 15 or len(x[1][1]) <= 15)
print(en_sv_5.take(3))

[Stage 61:>                                                         (0 + 1) / 1]

[(260, (['a', 'major', 'part', 'of', 'wales', 'as', 'you', 'know', 'has', 'been', 'granted', 'objective', '1', 'status', 'under', 'the', 'structural', 'funds', 'programme'], ['huvuddelen', 'av', 'wales', 'har', 'som', 'ni', 'känner', 'till', 'erkänts', 'mål', '1', 'status', 'enligt', 'strukturfondsprogrammen'])), (480, (['the', 'european', 'commission', 's', 'sixth', 'report', 'presents', 'very', 'valuable', 'conclusions'], ['europeiska', 'kommissionens', 'sjätte', 'rapport', 'innehåller', 'värdefulla', 'slutsatser'])), (800, (['competition', 'between', 'the', 'regions', 'will', 'certainly', 'strengthen', 'rather', 'than', 'weaken', 'the', 'european', 'union'], ['konkurrensen', 'mellan', 'regionerna', 'kommer', 'säkerligen', 'att', 'stärka', 'europeiska', 'unionen', 'och', 'inte', 'försvaga', 'den']))]


                                                                                

In [32]:
# Filter to leave only pairs of sentences with the same number of words in each sentence
en_sv_6 = en_sv_5.filter(lambda x: len(x[1][0]) == len(x[1][1]))
print(en_sv_6.take(3))

[Stage 63:>                                                         (0 + 1) / 1]

[(916030, (['take', 'nothing', 'for', 'granted', 'colleagues', 'until', 'it', 'is', 'done'], ['ta', 'ingenting', 'för', 'givet', 'kolleger', 'innan', 'detta', 'har', 'skett'])), (916320, (['we', 'know', 'nevertheless', 'that', 'mount', 'triglav', 'in', 'slovenia', 'symbolises', 'determination', 'and', 'accomplishment'], ['vi', 'vet', 'emellertid', 'att', 'berget', 'triglav', 'i', 'slovenien', 'symboliserar', 'beslutsamhet', 'och', 'förverkligande'])), (917845, (['we', 'need', 'to', 'break', 'it', 'down', 'to', 'make', 'practical', 'measures'], ['vi', 'måste', 'dela', 'upp', 'det', 'för', 'att', 'vidta', 'praktiska', 'åtgärder']))]


                                                                                

In [33]:
# For each sentence pair, map so that you pair each (in order) word in the two sentences.
en_sv_7 = en_sv_6.map(lambda x: list(zip(x[1][0],x[1][1])))
print(en_sv_7.take(3))

[Stage 65:>                                                         (0 + 1) / 1]

[[('this', 'parlamentet'), ('parliament', 'ser'), ('anxiously', 'fram'), ('awaits', 'emot'), ('this', 'ett'), ('complete', 'sådant'), ('reform', 'fullständigt'), ('programme', 'reformprogram')], [('allow', 'låt'), ('me', 'mig'), ('just', 'också'), ('to', 'helt'), ('go', 'kort'), ('very', 'gå'), ('briefly', 'in'), ('into', 'på'), ('a', 'en'), ('second', 'annan'), ('matter', 'sak')], [('it', 'det'), ('is', 'är'), ('nonetheless', 'dock'), ('clear', 'klart'), ('that', 'att'), ('the', 'denna'), ('cost', 'kostnad'), ('will', 'kommer'), ('be', 'att'), ('borne', 'bäras'), ('jointly', 'gemensamt'), ('by', 'av'), ('manufacturers', 'producenter'), ('and', 'och'), ('consumers', 'konsumenter')]]


                                                                                

In [34]:
# count the number of occurrences of the word-translation-pairs.
wordPair_counts = en_sv_7.map(lambda wp: len(wp))
total_wordPairs = wordPair_counts.reduce(add)
print("total word-translation-pairs:", total_wordPairs)



total word-translation-pairs: 1013530


                                                                                

In [35]:
# mapper the word pairs for compute the frequency
en_sv_9 = en_sv_6.flatMap(lambda x: list(zip(x[1][0],x[1][1]))).map(lambda x:(x,1))
print(en_sv_9.take(3))

[Stage 69:>                                                         (0 + 1) / 1]

[(('take', 'ta'), 1), (('nothing', 'ingenting'), 1), (('for', 'för'), 1)]


                                                                                

In [36]:
# Print some of the most frequently occurring pairs of words.
en_sv_9_freq = en_sv_9.reduceByKey(add).sortBy(lambda x:x[1], False).take(20)
print(en_sv_9_freq)

                                                                                

[(('is', 'är'), 15983), (('we', 'vi'), 11878), (('i', 'jag'), 11378), (('and', 'och'), 9528), (('this', 'detta'), 6990), (('it', 'det'), 6697), (('a', 'en'), 6330), (('in', 'i'), 5764), (('not', 'inte'), 5463), (('to', 'att'), 4802), (('that', 'det'), 4475), (('that', 'att'), 4174), (('the', 'den'), 4060), (('a', 'ett'), 4018), (('have', 'har'), 3974), (('for', 'för'), 3772), (('of', 'av'), 3717), (('applause', 'applåder'), 3358), (('the', 'det'), 3308), (('are', 'är'), 3143)]
