# Section A - Working with the RDD API

In [1]:
from pyspark.sql import SparkSession
from operator import add

spark_session = SparkSession\
        .builder\
        .master("spark://192.168.2.113:7077") \
        .appName("Siwei Fu A3 RDD")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.shuffle.service.enabled", True)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores",2)\
        .config("spark.driver.port",9998)\
        .config("spark.blockManager.port",10005)\
        .getOrCreate()

spark_context = spark_session.sparkContext
spark_context.setLogLevel("INFO")

## Question A.1

In [2]:
# The code in this bolock is to get the file list in the given folder
# It is used to seperate the file of other language and it English transcript.
# Referenced from https://stackoverflow.com/questions/35750614/pyspark-get-list-of-files-directories-on-hdfs-path

URI           = spark_context._gateway.jvm.java.net.URI
Path          = spark_context._gateway.jvm.org.apache.hadoop.fs.Path
FileSystem    = spark_context._gateway.jvm.org.apache.hadoop.fs.FileSystem
Configuration = spark_context._gateway.jvm.org.apache.hadoop.conf.Configuration

fs = FileSystem.get(URI("hdfs://192.168.2.113:9000"), Configuration())
status = fs.listStatus(Path('/europarl/'))
English_file = []
Other_file = []
for fileStatus in status:
    temp = str(fileStatus.getPath())
    if temp.endswith('.en'): # using the last 3 characters to distinguish the language file.
        English_file.append(temp)
    else:
        Other_file.append(temp)
        
E_lines = spark_context.textFile(','.join(English_file))
O_lines = spark_context.textFile(','.join(Other_file))

print(len(English_file))
print(len(Other_file))
print([i.split('/')[-1].partition('.')[2] for i in English_file])
print([i.split('/')[-1].partition('.')[2] for i in Other_file])



20
20
['bg-en.en', 'cs-en.en', 'da-en.en', 'de-en.en', 'el-en.en', 'es-en.en', 'et-en.en', 'fi-en.en', 'fr-en.en', 'hu-en.en', 'it-en.en', 'lt-en.en', 'lv-en.en', 'nl-en.en', 'pl-en.en', 'pt-en.en', 'ro-en.en', 'sk-en.en', 'sl-en.en', 'sv-en.en']
['bg-en.bg', 'cs-en.cs', 'da-en.da', 'de-en.de', 'el-en.el', 'es-en.es', 'et-en.et', 'fi-en.fi', 'fr-en.fr', 'hu-en.hu', 'it-en.it', 'lt-en.lt', 'lv-en.lv', 'nl-en.nl', 'pl-en.pl', 'pt-en.pt', 'ro-en.ro', 'sk-en.sk', 'sl-en.sl', 'sv-en.sv']


In [3]:
# A.1.1 Read the English transcripts with Spark, and count the number of lines.
# A.1.2 Do the same with the other language (so that you have a separate lineage of RDDs for each).
# A.1.3 Verify that the line counts are the same for the two languages.
    # Anwser: Yes, the number of lines for each language file is the same,
    #         and the total line number is also the same in 24652024.
# A.1.4 Count the number of partitions.
    # Anwser: The partitions for English file is 38, and for other languages is 40.

line_num = []
Flag = True

for i in range(len(English_file)):
    num_e = spark_context.textFile(English_file[i]).map(lambda i: 1).reduce(add)
    num_o = spark_context.textFile(Other_file[i]).map(lambda i: 1).reduce(add)
    line_num.append((Other_file[i][-2:]+'-en',num_e,num_o))
    if num_e != num_o:
        Flag = False
        
print(f"The number of lines for each language is the same? Anwser is: {Flag}.\n")
print(f"Total line from each file is {line_num}.\n")

etotal_lines = E_lines.map(lambda i: 1).reduce(add)
ototal_lines = O_lines.map(lambda i: 1).reduce(add)

print(f"Total line from English transcripts is {etotal_lines}.")
print(f"Total line from transcripts in other languages is {ototal_lines}.")

print(f'The partition for finding out total line number for English is {E_lines.getNumPartitions()}.')
print(f'The partition for finding out total line number for other languages is {O_lines.getNumPartitions()}.')

The number of lines for each language is the same? Anwser is: True.

Total line from each file is [('bg-en', 406934, 406934), ('cs-en', 646605, 646605), ('da-en', 1968800, 1968800), ('de-en', 1920209, 1920209), ('el-en', 1235976, 1235976), ('es-en', 1965734, 1965734), ('et-en', 651746, 651746), ('fi-en', 1924942, 1924942), ('fr-en', 2007723, 2007723), ('hu-en', 624934, 624934), ('it-en', 1909115, 1909115), ('lt-en', 635146, 635146), ('lv-en', 637599, 637599), ('nl-en', 1997775, 1997775), ('pl-en', 632565, 632565), ('pt-en', 1960407, 1960407), ('ro-en', 399375, 399375), ('sk-en', 640715, 640715), ('sl-en', 623490, 623490), ('sv-en', 1862234, 1862234)].

Total line from English transcripts is 24652024.
Total line from transcripts in other languages is 24652024.
The partition for finding out total line number for English is 38.
The partition for finding out total line number for other languages is 40.


## Question A.2

In [4]:
#A.2.1 Pre-process the text from both RDDs by doing the following:
#    ● Lowercase the text
#    ● Tokenize the text (split on space)
#    Hint: define a function to run in your driver application to avoid writing this code twice.
#A.2.2 Inspect 10 entries from each of your RDDs to verify your pre-processing.
    # Explaination: The requirement here is not that clear, 
    #               so a rdd with lowercase and tokenized for each line is returned.
#A.2.3 Verify that the line counts still match after the pre-processing.
    # Anwser: The line counts are still the same in 24652024.

def prepocess(line):
    line_lower = line.lower()
    tokens = line_lower.split(' ')
    return (line_lower,tokens)

LoTo_E_lines = E_lines.map(prepocess)
LoTo_O_lines = O_lines.map(prepocess)

print(LoTo_E_lines.take(10))
print(LoTo_O_lines.take(10))

ToLiPr_Eng = LoTo_E_lines.map(lambda i: 1).reduce(add)
ToLiPr_Oth = LoTo_O_lines.map(lambda i: 1).reduce(add)

print(f'The number of total lines for English and other langaue is {ToLiPr_Eng},{ToLiPr_Oth} respectively.')

[('membership of parliament: see minutes', ['membership', 'of', 'parliament:', 'see', 'minutes']), ('approval of minutes of previous sitting: see minutes', ['approval', 'of', 'minutes', 'of', 'previous', 'sitting:', 'see', 'minutes']), ('membership of parliament: see minutes', ['membership', 'of', 'parliament:', 'see', 'minutes']), ('verification of credentials: see minutes', ['verification', 'of', 'credentials:', 'see', 'minutes']), ('documents received: see minutes', ['documents', 'received:', 'see', 'minutes']), ('written statements and oral questions (tabling): see minutes', ['written', 'statements', 'and', 'oral', 'questions', '(tabling):', 'see', 'minutes']), ('petitions: see minutes', ['petitions:', 'see', 'minutes']), ('texts of agreements forwarded by the council: see minutes', ['texts', 'of', 'agreements', 'forwarded', 'by', 'the', 'council:', 'see', 'minutes']), ("action taken on parliament's resolutions: see minutes", ['action', 'taken', 'on', "parliament's", 'resolutions:'

## Question 1.A.3

In [5]:
#A.3.1 Use Spark to compute the 10 most frequently according words in the English language corpus. 
     # Repeat for the other language.
#A.3.2 Verify that your results are reasonable.

E_wordMR = E_lines\
          .flatMap(lambda i:i.lower().split(' '))\
          .map(lambda j:(j,1))\
          .reduceByKey(add)\
          .takeOrdered(10, key=lambda x: -x[1])
print(f'The top 10 words in English are:\n')
print(f'{E_wordMR}\n')

print(f'The top 10 words in other languages are as following:\n')
for file in Other_file:
    wordMR = spark_context.textFile(file)\
             .flatMap(lambda i: i.lower().split(' '))\
             .map(lambda j:(j,1))\
             .reduceByKey(add)\
             .takeOrdered(10, key=lambda x: -x[1])
    print(f'{file[-2:]}: {wordMR}\n')

    
## Another option to get the same analysis by using function.
#N = 10;
#def topN(file):
#    wordMR_topN = spark_context.textFile(file)\
#             .flatMap(lambda i: i.lower().split(' '))\
#             .map(lambda j:(j,1))\
#             .reduceByKey(add)\
#             .takeOrdered(N, key=lambda x: -x[1])
#    print(f'{file[-2:]}: {wordMR_topN }\n')
#
#topN(','.join(English_file))
#for file in Other_file:
#    topN(file)
 

The top 10 words in English are:

[('the', 46739644), ('of', 22254111), ('to', 20520872), ('and', 17433647), ('in', 14589270), ('that', 10544465), ('a', 10332226), ('is', 10077786), ('for', 7219155), ('we', 7001132)]

The top 10 words in other languages are as following:

bg: [('на', 604938), ('да', 330186), ('и', 328079), ('за', 261271), ('в', 228108), ('от', 168749), ('се', 150472), ('е', 129681), ('че', 114145), ('с', 95262)]

cs: [('a', 484493), ('v', 292767), ('se', 214188), ('na', 200086), ('že', 189615), ('je', 189496), ('o', 128350), ('pro', 111549), ('s', 93065), ('k', 83879)]

da: [('at', 1548583), ('og', 1435787), ('i', 1257318), ('er', 1029558), ('for', 931543), ('det', 921662), ('af', 908608), ('til', 752712), ('en', 677220), ('de', 663070)]

de: [('die', 1980469), ('der', 1710353), ('und', 1337721), ('in', 781359), ('zu', 618872), ('den', 577654), ('wir', 489036), ('für', 478326), ('ich', 469022), ('das', 466126)]

el: [('να', 1032300), ('και', 1013616), ('της', 759613), 

## Question A.4

A.4.1 Use this parallel corpus to mine some translations in the form of word pairs, for the two languages. Do this by pairing words found on short lines with the same number of words respectively. We (incorrectly) assume the words stay in the same order when translated.

Follow this approach. Work with the pair of RDDs you created in question A.2.
Hint: make a new pair of RDDs for each step, sv_1, en_1, sv_2, en_2, ...

1. Key the lines by their line number (hint: ZipWithIndex()).
2. Swap the key and value - so that the line number is the key.
3. Join the two RDDs together according to the line number key, so you have pairs of matching lines.
4. Filter to exclude line pairs that have an empty/missing “corresponding” sentence.
5. Filter to leave only pairs of sentences with a small number of words per sentence,this should give a more reliable translation (you can experiment).
6. Filter to leave only pairs of sentences with the same number of words in each sentence.
7. For each sentence pair, map so that you pair each (in order) word in the two sentences. We no longer need the line numbers. (hint: use python’s built in zip() function)
8. Use reduce to count the number of occurrences of the word-translation-pairs.
9. Print some of the most frequently occurring pairs of words.

Do your translations seem reasonable? Use a dictionary to check a few (don’t worry, you won’t be marked down for incorrect translations!).

In [6]:
def kv_rev(rdd):
    nrdd = rdd.zipWithIndex()\
              .map(lambda line:(line[1],line[0]))
    return nrdd

j = len(Other_file) 

# The reason to choose for loop here is when change the input of range(), different bunches of files are able to
# be analyzed, and that will lower down the excutue core numbers and process time.
# and when the above j is the input, all the files then will be analyzed.

for i in range(j):
    e = spark_context.textFile(English_file[i]) 
    o = spark_context.textFile(Other_file[i])  
    pairedrdd = kv_rev(e).join(kv_rev(o))
    #print(f'{pairedrdd.take(10)}\n')
    
    n = 10 # This number is used to filter the lines with n words
    npairedrdd = pairedrdd.filter(lambda i: bool((i[1][0].strip()) and (i[1][1].strip())))\
            .filter(lambda i: bool((len(i[1][0].strip().split(' '))<n) and (len(i[1][1].strip().split(' '))<n)))\
            .filter(lambda i: bool(len(i[1][0].strip().split(' '))==len(i[1][1].strip().split(' '))))
    #print(f'After filter:{npairedrdd.take(10)}\n')

    num = 10 # This number is used for picking out the top num pairs.
    wordpair = npairedrdd.flatMap(lambda i: i[1][0].strip().lower().split(' '))\
                .zip(npairedrdd.flatMap(lambda i: i[1][1].strip().lower().split(' ')))\
                .map(lambda i: (i[0].replace('.',''),i[1].replace('.','')))\
                .map(lambda i: (i[0].replace('-',''),i[1].replace('-','')))\
                .filter(lambda i: bool(not(i[0].isnumeric() or i[1].isnumeric())))\
                .filter(lambda i: bool(i[0] and i[1]))\
                .map(lambda i:(i,1))\
                .reduceByKey(add)\
                .takeOrdered(num, key=lambda x: -x[1])
    print(f'en--{Other_file[i][-2:]} top {num} matches:\n{wordpair}\n')


en--bg top 10 matches:
[(('is', 'е'), 1287), (('of', 'на'), 1154), (('see', 'вж'), 976), (('(applause)', '(ръкопляскания)'), 912), (('this', 'това'), 815), (('written', 'писмени'), 687), (('minutes', 'протокола'), 665), (('and', 'и'), 643), (('that', 'това'), 601), (('(rule', '(член'), 567)]

en--cs top 10 matches:
[(('(applause)', '(potlesk)'), 1749), (('is', 'je'), 1650), (('and', 'a'), 948), (('written', 'písemná'), 821), (('(rule', '(článek'), 803), (('statements', 'prohlášení'), 796), (('see', 'viz'), 725), (('minutes', 'zápis'), 719), (('that', 'to'), 660), (('thank', 'děkuji'), 617)]

en--da top 10 matches:
[(('is', 'er'), 7808), (('we', 'vi'), 3406), (('i', 'jeg'), 3041), (('that', 'det'), 2674), (('it', 'det'), 2272), (('this', 'det'), 2016), (('not', 'ikke'), 2000), (('a', 'en'), 1944), (('and', 'og'), 1916), (('(applause)', '(bifald)'), 1909)]

en--de top 10 matches:
[(('is', 'ist'), 10266), (('the', 'die'), 8644), (('debate', 'aussprache'), 4028), (('we', 'wir'), 3823), (('

In [7]:
spark_session.stop()