In [7]:
from pyspark.sql import SparkSession

# (8 cores, 16gb per machine) x 5 = 40 cores

# New API
spark_session = SparkSession\
        .builder\
        .master("spark://192.168.2.87:7077") \
        .appName("abdulbasit_partA")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.shuffle.service.enabled", True)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores",4)\
        .getOrCreate()

# Old API (RDD)
spark_context = spark_session.sparkContext

In [8]:
spark_context.setLogLevel("DEBUG")

def add(a, b):
    # commutative and associative!
    return a + b

rdd = spark_context.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], 3)

result = rdd.filter(lambda x: x % 2 == 0)\
            .map(lambda x: x ** 2)\
            .reduce(add)

print(result)

#See: http://spark.apache.org/docs/2.4.5/api/python/pyspark.html

220


In [9]:
lines = spark_context.textFile("hdfs://192.168.2.87:9000/europarl/europarl-v7.fi-en.en")
print(lines.first())

Resumption of the session


In [11]:
# Part A1.1
#count number of lines

def countno_Lines(myrdd):
    line = myrdd.map(lambda s: 1)
    line.persist()
    totalno_Line = line.reduce(lambda a,b:a+b)
    return totalno_Line

en_1 = spark_context.textFile("hdfs://192.168.2.87:9000/europarl/europarl-v7.fi-en.en")
totalno_Lines_EN = countno_Lines(en_1)
print(totalno_Lines_EN)

1924942


In [12]:
# A1.2

fi_1 = spark_context.textFile("hdfs://192.168.2.87:9000/europarl/europarl-v7.fi-en.fi")
totalno_Lines_fi = countno_Lines(fi_1)
print(totalno_Lines_fi)

1924942


In [13]:
# A1.3

print("Do both the files have same number of lines? "+ str(totalno_Lines_EN==totalno_Lines_fi) )

Do both the files have same number of lines? True


In [14]:
# A1.4

print("Number of Partitions for English transcripts: "+ str(en_1.getNumPartitions()))
print("Number of Partitions for Finnish transcripts: "+ str(fi_1.getNumPartitions()))

Number of Partitions for English transcripts: 3
Number of Partitions for Finnish transcripts: 3


In [15]:
# Fucntion of text preprocessing

def pre_process(myrdd):
    myrdd = myrdd.lower()
    myrdd = myrdd.split(' ')
    return myrdd

en_2 = en_1.map(pre_process)
fi_2 = fi_1.map(pre_process)

In [16]:
# A2.2 Part 1

en_2.take(10)

[['resumption', 'of', 'the', 'session'],
 ['i',
  'declare',
  'resumed',
  'the',
  'session',
  'of',
  'the',
  'european',
  'parliament',
  'adjourned',
  'on',
  'friday',
  '17',
  'december',
  '1999,',
  'and',
  'i',
  'would',
  'like',
  'once',
  'again',
  'to',
  'wish',
  'you',
  'a',
  'happy',
  'new',
  'year',
  'in',
  'the',
  'hope',
  'that',
  'you',
  'enjoyed',
  'a',
  'pleasant',
  'festive',
  'period.'],
 ['although,',
  'as',
  'you',
  'will',
  'have',
  'seen,',
  'the',
  'dreaded',
  "'millennium",
  "bug'",
  'failed',
  'to',
  'materialise,',
  'still',
  'the',
  'people',
  'in',
  'a',
  'number',
  'of',
  'countries',
  'suffered',
  'a',
  'series',
  'of',
  'natural',
  'disasters',
  'that',
  'truly',
  'were',
  'dreadful.'],
 ['you',
  'have',
  'requested',
  'a',
  'debate',
  'on',
  'this',
  'subject',
  'in',
  'the',
  'course',
  'of',
  'the',
  'next',
  'few',
  'days,',
  'during',
  'this',
  'part-session.'],
 ['in',
  

In [17]:
fi_2.take(10)

[['istuntokauden', 'uudelleenavaaminen'],
 ['julistan',
  'perjantaina',
  'joulukuun',
  '17.',
  'päivänä',
  'keskeytetyn',
  'euroopan',
  'parlamentin',
  'istunnon',
  'avatuksi',
  'ja',
  'esitän',
  'vielä',
  'kerran',
  'vilpittömän',
  'toiveeni',
  'siitä,',
  'että',
  'teillä',
  'olisi',
  'ollut',
  'oikein',
  'mukava',
  'joululoma.'],
 ['kuten',
  'olette',
  'varmaan',
  'saattaneet',
  'huomata,',
  'vuodenvaihteeseen',
  '2000',
  'povattuja',
  'suuria',
  'tietokoneongelmia',
  'ei',
  'ilmennytkään.',
  'sen',
  'sijaan',
  'todella',
  'kauheat',
  'luonnonkatastrofit',
  'koettelivat',
  'kansalaisia',
  'joissakin',
  'unionimme',
  'maissa.'],
 ['te',
  'olette',
  'esittäneet',
  'toiveen,',
  'että',
  'tästä',
  'asiasta',
  'keskusteltaisiin',
  'lähipäivinä',
  'tämän',
  'istuntojakson',
  'aikana.'],
 ['sillä',
  'välin',
  'toivoisin,',
  'kuten',
  'useampi',
  'kollega',
  'on',
  'minulle',
  'esittänytkin,',
  'että',
  'viettäisimme',
  'minuu

In [21]:
## A2.3

total_lines_en2 = countno_Lines(en_2)
total_lines_fi2 = countno_Lines(fi_2)
print("Is line count same for the two laguages after pre processing? "+ str(total_lines_en2==total_lines_fi2) )

Is line count same for the two laguages after pre processing? True


In [20]:
# A3.1 Most frequent 20 words in english

en_2.flatMap(lambda a: a).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).sortBy(lambda a: a[1],ascending=False).take(10)

[('the', 3631787),
 ('of', 1724191),
 ('to', 1600135),
 ('and', 1339069),
 ('in', 1127219),
 ('that', 830095),
 ('a', 803908),
 ('is', 785410),
 ('for', 553730),
 ('we', 548086)]

In [22]:
# A3.1 Most frequent 10 words in Finnish

fi_2.flatMap(lambda a: a).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).sortBy(lambda a: a[1],ascending=False).take(10)

[('ja', 1249156),
 ('on', 1035956),
 ('että', 619655),
 ('euroopan', 257568),
 ('ei', 246268),
 ('myös', 178765),
 ('ovat', 161869),
 ('se', 152857),
 ('arvoisa', 149439),
 ('ole', 134745)]

In [23]:
# A 4.1 & 4.2 Key lines with line number and swappig key and value [English]

en_3 = en_2.zipWithIndex().map(lambda t : (t[1],t[0]))

In [24]:
# A 4.1 Key lines with line number and swapping key and value [Finnsih]

fi_3 = fi_2.zipWithIndex().map(lambda t : (t[1],t[0]))

In [25]:
# A 4.3 Join the 2 RDD's together

joined = en_3.join(fi_3)

In [28]:
# A 4.4 Filter missing sentence

filter_missing = joined.filter(lambda a: a[1][1] and a[1][0])

In [29]:
# A 4.5 and 4.6 Filter 

filter_words_per_sentence = filter_missing.filter(lambda a: ((len(a[1][1]) < 5) and (len(a[1][0]) < 5)) and (len(a[1][1])==len(a[1][0]))   )

In [30]:
# A 4.7 Filter and Mapping paired words

pairs = filter_words_per_sentence.map(lambda a: list(zip(a[1][0],a[1][1]))).flatMap(lambda a: a).map(lambda word: (word, 1))

In [31]:
# A 4.8 Reduce Pair Count

pair_count = pairs.reduceByKey(lambda a, b: a + b).sortBy(lambda a: a[1],ascending=False)

In [32]:
# A 4.9 Print 20 most frquent words

pair_count.take(20)

[(('.', '.'), 6971),
 (('\xa0\xa0', '\xa0\xa0'), 4996),
 (('(applause)', '(suosionosoituksia)'), 2803),
 (('is', 'on'), 2437),
 (('report', 'mietintö'), 1006),
 (('closed.', 'päättynyt.'), 797),
 (('debate', 'keskustelu'), 770),
 (('this', 'tämä'), 452),
 (('applause', 'suosionosoituksia'), 444),
 (('that', 'se'), 434),
 (('1.', '1.'), 428),
 (('2.', '2.'), 423),
 (('that', 'tämä'), 407),
 (('3.', '3.'), 393),
 (('it', 'se'), 352),
 (('-', '-'), 349),
 (('why?', 'miksi?'), 338),
 (('is', 'ei'), 320),
 (('minutes', 'pöytäkirja'), 307),
 (('see', 'ks.'), 304)]